You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/08/06 21:17:50 UTC

[incubator-druid] branch master updated: Order rows during incremental index persist when rollup is disabled. (#6107)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6267721  Order rows during incremental index persist when rollup is disabled. (#6107)
6267721 is described below

commit 62677212cc72e3024cb9f1e72455155e5b746d38
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Mon Aug 6 14:17:48 2018 -0700

    Order rows during incremental index persist when rollup is disabled. (#6107)
    
    * order using IncrementalIndexRowComparator at persist time when rollup is disabled, allowing increased effectiveness of dimension compression, resolves #6066
    
    * fix stuff from review
---
 .../druid/benchmark/datagen/BenchmarkSchemas.java  | 50 ++++++++++++++
 .../benchmark/indexing/IndexPersistBenchmark.java  | 67 +++++++++---------
 .../segment/incremental/IncrementalIndex.java      | 66 +++++++++++++++---
 .../incremental/IncrementalIndexAdapter.java       |  2 +-
 .../incremental/IncrementalIndexRowIterator.java   |  2 +-
 .../incremental/OffheapIncrementalIndex.java       |  2 +-
 .../incremental/OnheapIncrementalIndex.java        |  2 +-
 .../druid/segment/data/IncrementalIndexTest.java   |  2 +-
 .../incremental/IncrementalIndexAdapterTest.java   | 79 ++++++++++++++++++++++
 9 files changed, 226 insertions(+), 46 deletions(-)

diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
index d50c1a0..775e94b 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
@@ -157,4 +157,54 @@ public class BenchmarkSchemas
     );
     SCHEMA_MAP.put("simpleFloat", basicSchema);
   }
+
+  static { // schema with high opportunity for rollup
+    List<BenchmarkColumnSchema> rolloColumns = ImmutableList.of(
+        // dims
+        BenchmarkColumnSchema.makeEnumerated(
+            "dimEnumerated",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"),
+            Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
+        ),
+        BenchmarkColumnSchema.makeEnumerated(
+            "dimEnumerated2",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
+            Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
+        ),
+        BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 100, 2.0),
+        BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100),
+
+        // metrics
+        BenchmarkColumnSchema.makeZipf("metLongZipf", ValueType.LONG, true, 1, null, 0, 10000, 2.0),
+        BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
+        BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
+        BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.5)
+    );
+
+    List<AggregatorFactory> rolloSchemaIngestAggs = new ArrayList<>();
+    rolloSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
+    rolloSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
+    rolloSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
+    rolloSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
+    rolloSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
+    rolloSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
+
+    Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
+
+    BenchmarkSchemaInfo rolloSchema = new BenchmarkSchemaInfo(
+        rolloColumns,
+        rolloSchemaIngestAggs,
+        basicSchemaDataInterval,
+        true
+    );
+    SCHEMA_MAP.put("rollo", rolloSchema);
+  }
 }
diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
index 6d5e172..c299b09 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -32,7 +32,6 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
 import io.druid.segment.IndexSpec;
-import io.druid.segment.column.ColumnConfig;
 import io.druid.segment.incremental.IncrementalIndex;
 import io.druid.segment.incremental.IncrementalIndexSchema;
 import io.druid.segment.serde.ComplexMetrics;
@@ -64,44 +63,38 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class IndexPersistBenchmark
 {
-  @Param({"75000"})
-  private int rowsPerSegment;
-
-  @Param({"basic"})
-  private String schema;
-
-  @Param({"true", "false"})
-  private boolean rollup;
-
+  public static final ObjectMapper JSON_MAPPER;
   private static final Logger log = new Logger(IndexPersistBenchmark.class);
   private static final int RNG_SEED = 9999;
-
-  private IncrementalIndex incIndex;
-  private ArrayList<InputRow> rows;
-  private BenchmarkSchemaInfo schemaInfo;
-
-
   private static final IndexMergerV9 INDEX_MERGER_V9;
   private static final IndexIO INDEX_IO;
-  public static final ObjectMapper JSON_MAPPER;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
         OffHeapMemorySegmentWriteOutMediumFactory.instance(),
-        new ColumnConfig()
-        {
-          @Override
-          public int columnCacheSizeBytes()
-          {
-            return 0;
-          }
-        }
+        () -> 0
     );
     INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
   }
 
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  @Param({"rollo"})
+  private String schema;
+
+  @Param({"true", "false"})
+  private boolean rollup;
+
+  @Param({"none", "moderate", "high"})
+  private String rollupOpportunity;
+
+  private IncrementalIndex incIndex;
+  private ArrayList<InputRow> rows;
+  private BenchmarkSchemaInfo schemaInfo;
+
   @Setup
   public void setup()
   {
@@ -114,11 +107,23 @@ public class IndexPersistBenchmark
     rows = new ArrayList<InputRow>();
     schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
 
+    int valuesPerTimestamp = 1;
+    switch (rollupOpportunity) {
+      case "moderate":
+        valuesPerTimestamp = 1000;
+        break;
+      case "high":
+        valuesPerTimestamp = 10000;
+        break;
+
+    }
+
     BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
         schemaInfo.getColumnSchemas(),
         RNG_SEED,
-        schemaInfo.getDataInterval(),
-        rowsPerSegment
+        schemaInfo.getDataInterval().getStartMillis(),
+        valuesPerTimestamp,
+        1000.0
     );
 
     for (int i = 0; i < rowsPerSegment; i++) {
@@ -128,8 +133,6 @@ public class IndexPersistBenchmark
       }
       rows.add(row);
     }
-
-
   }
 
   @Setup(Level.Iteration)
@@ -154,9 +157,9 @@ public class IndexPersistBenchmark
     return new IncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
-            .withMetrics(schemaInfo.getAggsArray())
-            .withRollup(rollup)
-            .build()
+                .withMetrics(schemaInfo.getAggsArray())
+                .withRollup(rollup)
+                .build()
         )
         .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
index cdf8957..0faeaef 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
@@ -80,6 +80,7 @@ import javax.annotation.concurrent.GuardedBy;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Deque;
 import java.util.Iterator;
@@ -94,6 +95,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 /**
  */
@@ -366,9 +368,24 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     @VisibleForTesting
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      this.incrementalIndexSchema = new IncrementalIndexSchema.Builder()
-          .withMetrics(metrics)
-          .build();
+      return setSimpleTestingIndexSchema(null, metrics);
+    }
+
+
+    /**
+     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+     * would use it in production settings.
+     *
+     * @param metrics variable array of {@link AggregatorFactory} metrics
+     *
+     * @return this
+     */
+    @VisibleForTesting
+    public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+    {
+      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
       return this;
     }
 
@@ -1203,6 +1220,12 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     Iterable<IncrementalIndexRow> keySet();
 
     /**
+     * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>}
+     * @return
+     */
+    Iterable<IncrementalIndexRow> persistIterable();
+
+    /**
      * @return the previous rowIndex associated with the specified key, or
      * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
      */
@@ -1290,6 +1313,13 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     }
 
     @Override
+    public Iterable<IncrementalIndexRow> persistIterable()
+    {
+      // with rollup, facts are already pre-sorted so just return keyset
+      return keySet();
+    }
+
+    @Override
     public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
     {
       // setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers.
@@ -1310,7 +1340,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     private final boolean sortFacts;
     private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
 
-    public PlainFactsHolder(boolean sortFacts)
+    private final Comparator<IncrementalIndexRow> incrementalIndexRowComparator;
+
+    public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow> incrementalIndexRowComparator)
     {
       this.sortFacts = sortFacts;
       if (sortFacts) {
@@ -1318,6 +1350,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
       } else {
         this.facts = new ConcurrentHashMap<>();
       }
+      this.incrementalIndexRowComparator = incrementalIndexRowComparator;
     }
 
     @Override
@@ -1351,10 +1384,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     public Iterator<IncrementalIndexRow> iterator(boolean descending)
     {
       if (descending && sortFacts) {
-        return concat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
+        return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
                 .descendingMap().values(), true).iterator();
       }
-      return concat(facts.values(), false).iterator();
+      return timeOrderedConcat(facts.values(), false).iterator();
     }
 
     @Override
@@ -1363,10 +1396,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
       ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
           ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).subMap(timeStart, timeEnd);
       final Map<Long, Deque<IncrementalIndexRow>> rangeMap = descending ? subMap.descendingMap() : subMap;
-      return concat(rangeMap.values(), descending);
+      return timeOrderedConcat(rangeMap.values(), descending);
     }
 
-    private Iterable<IncrementalIndexRow> concat(
+    private Iterable<IncrementalIndexRow> timeOrderedConcat(
         final Iterable<Deque<IncrementalIndexRow>> iterable,
         final boolean descending
     )
@@ -1379,10 +1412,25 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
       );
     }
 
+    private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
+        final Collection<Deque<IncrementalIndexRow>> rowGroups
+    )
+    {
+      return rowGroups.stream()
+                      .flatMap(Collection::stream)
+                      .sorted(incrementalIndexRowComparator);
+    }
+
     @Override
     public Iterable<IncrementalIndexRow> keySet()
     {
-      return concat(facts.values(), false);
+      return timeOrderedConcat(facts.values(), false);
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> persistIterable()
+    {
+      return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
     }
 
     @Override
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
index 40ad4ec..59856fa 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
@@ -93,7 +93,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
   )
   {
     int rowNum = 0;
-    for (IncrementalIndexRow row : index.getFacts().keySet()) {
+    for (IncrementalIndexRow row : index.getFacts().persistIterable()) {
       final Object[] dims = row.getDims();
 
       for (IncrementalIndex.DimensionDesc dimension : dimensions) {
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
index 803460c..f3634c2 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
@@ -50,7 +50,7 @@ class IncrementalIndexRowIterator implements TransformableRowIterator
 
   IncrementalIndexRowIterator(IncrementalIndex<?> incrementalIndex)
   {
-    this.timeAndDimsIterator = incrementalIndex.getFacts().keySet().iterator();
+    this.timeAndDimsIterator = incrementalIndex.getFacts().persistIterable().iterator();
     this.currentRowPointer = makeRowPointer(incrementalIndex, currentRowHolder, currentRowNumCounter);
     // markedRowPointer doesn't actually need to be a RowPointer (just a TimeAndDimsPointer), but we create a RowPointer
     // in order to reuse the makeRowPointer() method. Passing a dummy RowNumCounter.
diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
index 472adfd..198137b 100644
--- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -84,7 +84,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
     this.bufferPool = bufferPool;
 
     this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
-                                                   : new PlainFactsHolder(sortFacts);
+                                                   : new PlainFactsHolder(sortFacts, dimsComparator());
 
     //check that stupid pool gives buffers that can hold at least one row's aggregators
     ResourceHolder<ByteBuffer> bb = bufferPool.take();
diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
index 0b86dd0..c08f579 100644
--- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -79,7 +79,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
     this.maxRowCount = maxRowCount;
     this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
     this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
-                                                   : new PlainFactsHolder(sortFacts);
+                                                   : new PlainFactsHolder(sortFacts, dimsComparator());
     maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema);
   }
 
diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
index 0d683d4..0ee0d07 100644
--- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
@@ -240,7 +240,7 @@ public class IncrementalIndexTest
     }
 
     return new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(aggregatorFactories)
+        .setSimpleTestingIndexSchema(false, aggregatorFactories)
         .setMaxRowCount(1000000)
         .buildOnheap();
   }
diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 55f0546..7c24bce 100644
--- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -19,6 +19,7 @@
 
 package io.druid.segment.incremental;
 
+import io.druid.java.util.common.StringUtils;
 import io.druid.segment.IndexSpec;
 import io.druid.segment.IndexableAdapter;
 import io.druid.segment.RowIterator;
@@ -32,6 +33,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 public class IncrementalIndexAdapterTest
 {
@@ -83,4 +85,81 @@ public class IncrementalIndexAdapterTest
     Assert.assertEquals(0, (long) rowNums.get(0));
     Assert.assertEquals(1, (long) rowNums.get(1));
   }
+
+  @Test
+  public void testGetRowsIterableNoRollup() throws Exception
+  {
+    final long timestamp = System.currentTimeMillis();
+    IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null);
+    IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+    IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+    IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+
+
+    ArrayList<Integer> dim1Vals = new ArrayList<>();
+    for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
+      dim1Vals.add(((int[]) row.getDims()[0])[0]);
+    }
+    ArrayList<Integer> dim2Vals = new ArrayList<>();
+    for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
+      dim2Vals.add(((int[]) row.getDims()[1])[0]);
+    }
+
+    final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
+        toPersist1.getInterval(),
+        toPersist1,
+        INDEX_SPEC.getBitmapSerdeFactory()
+                  .getBitmapFactory()
+    );
+
+    RowIterator rows = incrementalAdapter.getRows();
+    List<String> rowStrings = new ArrayList<>();
+    while (rows.moveToNext()) {
+      rowStrings.add(rows.getPointer().toString());
+    }
+
+    Function<Integer, String> getExpected = (rowNumber) -> {
+      if (rowNumber < 3) {
+        return StringUtils.format(
+            "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=1, dim2=2}, metrics={count=1}}",
+            rowNumber,
+            timestamp
+        );
+      } else {
+        return StringUtils.format(
+            "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=3, dim2=4}, metrics={count=1}}",
+            rowNumber,
+            timestamp
+        );
+      }
+    };
+
+
+    // without sorting, output would be
+    //    RowPointer{indexNum=0, rowNumber=0, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=1, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=2, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=3, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=4, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=5, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    // but with sorting, output should be
+    //    RowPointer{indexNum=0, rowNumber=0, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=1, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=2, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=3, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=4, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=5, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+
+    Assert.assertEquals(6, rowStrings.size());
+    for (int i = 0; i < 6; i++) {
+      if (i % 2 == 0) {
+        Assert.assertEquals(0, (long) dim1Vals.get(i));
+        Assert.assertEquals(0, (long) dim2Vals.get(i));
+      } else {
+        Assert.assertEquals(1, (long) dim1Vals.get(i));
+        Assert.assertEquals(1, (long) dim2Vals.get(i));
+      }
+      Assert.assertEquals(getExpected.apply(i), rowStrings.get(i));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org