You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/08/06 21:17:50 UTC
[incubator-druid] branch master updated: Order rows during
incremental index persist when rollup is disabled. (#6107)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6267721 Order rows during incremental index persist when rollup is disabled. (#6107)
6267721 is described below
commit 62677212cc72e3024cb9f1e72455155e5b746d38
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Mon Aug 6 14:17:48 2018 -0700
Order rows during incremental index persist when rollup is disabled. (#6107)
* order using IncrementalIndexRowComparator at persist time when rollup is disabled, allowing increased effectiveness of dimension compression, resolves #6066
* fix stuff from review
---
.../druid/benchmark/datagen/BenchmarkSchemas.java | 50 ++++++++++++++
.../benchmark/indexing/IndexPersistBenchmark.java | 67 +++++++++---------
.../segment/incremental/IncrementalIndex.java | 66 +++++++++++++++---
.../incremental/IncrementalIndexAdapter.java | 2 +-
.../incremental/IncrementalIndexRowIterator.java | 2 +-
.../incremental/OffheapIncrementalIndex.java | 2 +-
.../incremental/OnheapIncrementalIndex.java | 2 +-
.../druid/segment/data/IncrementalIndexTest.java | 2 +-
.../incremental/IncrementalIndexAdapterTest.java | 79 ++++++++++++++++++++++
9 files changed, 226 insertions(+), 46 deletions(-)
diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
index d50c1a0..775e94b 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
@@ -157,4 +157,54 @@ public class BenchmarkSchemas
);
SCHEMA_MAP.put("simpleFloat", basicSchema);
}
+
+ static { // schema with high opportunity for rollup
+ List<BenchmarkColumnSchema> rolloColumns = ImmutableList.of(
+ // dims
+ BenchmarkColumnSchema.makeEnumerated(
+ "dimEnumerated",
+ ValueType.STRING,
+ false,
+ 1,
+ null,
+ Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"),
+ Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
+ ),
+ BenchmarkColumnSchema.makeEnumerated(
+ "dimEnumerated2",
+ ValueType.STRING,
+ false,
+ 1,
+ null,
+ Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
+ Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
+ ),
+ BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 100, 2.0),
+ BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100),
+
+ // metrics
+ BenchmarkColumnSchema.makeZipf("metLongZipf", ValueType.LONG, true, 1, null, 0, 10000, 2.0),
+ BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
+ BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
+ BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.5)
+ );
+
+ List<AggregatorFactory> rolloSchemaIngestAggs = new ArrayList<>();
+ rolloSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
+ rolloSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
+ rolloSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
+ rolloSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
+ rolloSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
+ rolloSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
+
+ Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
+
+ BenchmarkSchemaInfo rolloSchema = new BenchmarkSchemaInfo(
+ rolloColumns,
+ rolloSchemaIngestAggs,
+ basicSchemaDataInterval,
+ true
+ );
+ SCHEMA_MAP.put("rollo", rolloSchema);
+ }
}
diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
index 6d5e172..c299b09 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -32,7 +32,6 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
-import io.druid.segment.column.ColumnConfig;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.serde.ComplexMetrics;
@@ -64,44 +63,38 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 25)
public class IndexPersistBenchmark
{
- @Param({"75000"})
- private int rowsPerSegment;
-
- @Param({"basic"})
- private String schema;
-
- @Param({"true", "false"})
- private boolean rollup;
-
+ public static final ObjectMapper JSON_MAPPER;
private static final Logger log = new Logger(IndexPersistBenchmark.class);
private static final int RNG_SEED = 9999;
-
- private IncrementalIndex incIndex;
- private ArrayList<InputRow> rows;
- private BenchmarkSchemaInfo schemaInfo;
-
-
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
- public static final ObjectMapper JSON_MAPPER;
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
- new ColumnConfig()
- {
- @Override
- public int columnCacheSizeBytes()
- {
- return 0;
- }
- }
+ () -> 0
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}
+ @Param({"75000"})
+ private int rowsPerSegment;
+
+ @Param({"rollo"})
+ private String schema;
+
+ @Param({"true", "false"})
+ private boolean rollup;
+
+ @Param({"none", "moderate", "high"})
+ private String rollupOpportunity;
+
+ private IncrementalIndex incIndex;
+ private ArrayList<InputRow> rows;
+ private BenchmarkSchemaInfo schemaInfo;
+
@Setup
public void setup()
{
@@ -114,11 +107,23 @@ public class IndexPersistBenchmark
rows = new ArrayList<InputRow>();
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ int valuesPerTimestamp = 1;
+ switch (rollupOpportunity) {
+ case "moderate":
+ valuesPerTimestamp = 1000;
+ break;
+ case "high":
+ valuesPerTimestamp = 10000;
+ break;
+
+ }
+
BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
- schemaInfo.getDataInterval(),
- rowsPerSegment
+ schemaInfo.getDataInterval().getStartMillis(),
+ valuesPerTimestamp,
+ 1000.0
);
for (int i = 0; i < rowsPerSegment; i++) {
@@ -128,8 +133,6 @@ public class IndexPersistBenchmark
}
rows.add(row);
}
-
-
}
@Setup(Level.Iteration)
@@ -154,9 +157,9 @@ public class IndexPersistBenchmark
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
- .withMetrics(schemaInfo.getAggsArray())
- .withRollup(rollup)
- .build()
+ .withMetrics(schemaInfo.getAggsArray())
+ .withRollup(rollup)
+ .build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
index cdf8957..0faeaef 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
@@ -80,6 +80,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
@@ -94,6 +95,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
/**
*/
@@ -366,9 +368,24 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
@VisibleForTesting
public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
- this.incrementalIndexSchema = new IncrementalIndexSchema.Builder()
- .withMetrics(metrics)
- .build();
+ return setSimpleTestingIndexSchema(null, metrics);
+ }
+
+
+ /**
+ * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+ * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+ * would use it in production settings.
+ *
+ * @param metrics variable array of {@link AggregatorFactory} metrics
+ *
+ * @return this
+ */
+ @VisibleForTesting
+ public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+ {
+ IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+ this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
return this;
}
@@ -1203,6 +1220,12 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
Iterable<IncrementalIndexRow> keySet();
/**
+ * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>}
+ * @return
+ */
+ Iterable<IncrementalIndexRow> persistIterable();
+
+ /**
* @return the previous rowIndex associated with the specified key, or
* {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
*/
@@ -1290,6 +1313,13 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
}
@Override
+ public Iterable<IncrementalIndexRow> persistIterable()
+ {
+ // with rollup, facts are already pre-sorted so just return keyset
+ return keySet();
+ }
+
+ @Override
public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
{
// setRowIndex() must be called before facts.putIfAbsent() for visibility of rowIndex from concurrent readers.
@@ -1310,7 +1340,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
private final boolean sortFacts;
private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
- public PlainFactsHolder(boolean sortFacts)
+ private final Comparator<IncrementalIndexRow> incrementalIndexRowComparator;
+
+ public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow> incrementalIndexRowComparator)
{
this.sortFacts = sortFacts;
if (sortFacts) {
@@ -1318,6 +1350,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
} else {
this.facts = new ConcurrentHashMap<>();
}
+ this.incrementalIndexRowComparator = incrementalIndexRowComparator;
}
@Override
@@ -1351,10 +1384,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending && sortFacts) {
- return concat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
+ return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
.descendingMap().values(), true).iterator();
}
- return concat(facts.values(), false).iterator();
+ return timeOrderedConcat(facts.values(), false).iterator();
}
@Override
@@ -1363,10 +1396,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).subMap(timeStart, timeEnd);
final Map<Long, Deque<IncrementalIndexRow>> rangeMap = descending ? subMap.descendingMap() : subMap;
- return concat(rangeMap.values(), descending);
+ return timeOrderedConcat(rangeMap.values(), descending);
}
- private Iterable<IncrementalIndexRow> concat(
+ private Iterable<IncrementalIndexRow> timeOrderedConcat(
final Iterable<Deque<IncrementalIndexRow>> iterable,
final boolean descending
)
@@ -1379,10 +1412,25 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
);
}
+ private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
+ final Collection<Deque<IncrementalIndexRow>> rowGroups
+ )
+ {
+ return rowGroups.stream()
+ .flatMap(Collection::stream)
+ .sorted(incrementalIndexRowComparator);
+ }
+
@Override
public Iterable<IncrementalIndexRow> keySet()
{
- return concat(facts.values(), false);
+ return timeOrderedConcat(facts.values(), false);
+ }
+
+ @Override
+ public Iterable<IncrementalIndexRow> persistIterable()
+ {
+ return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
}
@Override
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
index 40ad4ec..59856fa 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
@@ -93,7 +93,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
)
{
int rowNum = 0;
- for (IncrementalIndexRow row : index.getFacts().keySet()) {
+ for (IncrementalIndexRow row : index.getFacts().persistIterable()) {
final Object[] dims = row.getDims();
for (IncrementalIndex.DimensionDesc dimension : dimensions) {
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
index 803460c..f3634c2 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
@@ -50,7 +50,7 @@ class IncrementalIndexRowIterator implements TransformableRowIterator
IncrementalIndexRowIterator(IncrementalIndex<?> incrementalIndex)
{
- this.timeAndDimsIterator = incrementalIndex.getFacts().keySet().iterator();
+ this.timeAndDimsIterator = incrementalIndex.getFacts().persistIterable().iterator();
this.currentRowPointer = makeRowPointer(incrementalIndex, currentRowHolder, currentRowNumCounter);
// markedRowPointer doesn't actually need to be a RowPointer (just a TimeAndDimsPointer), but we create a RowPointer
// in order to reuse the makeRowPointer() method. Passing a dummy RowNumCounter.
diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
index 472adfd..198137b 100644
--- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -84,7 +84,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
this.bufferPool = bufferPool;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
- : new PlainFactsHolder(sortFacts);
+ : new PlainFactsHolder(sortFacts, dimsComparator());
//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
index 0b86dd0..c08f579 100644
--- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -79,7 +79,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
- : new PlainFactsHolder(sortFacts);
+ : new PlainFactsHolder(sortFacts, dimsComparator());
maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema);
}
diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
index 0d683d4..0ee0d07 100644
--- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
@@ -240,7 +240,7 @@ public class IncrementalIndexTest
}
return new IncrementalIndex.Builder()
- .setSimpleTestingIndexSchema(aggregatorFactories)
+ .setSimpleTestingIndexSchema(false, aggregatorFactories)
.setMaxRowCount(1000000)
.buildOnheap();
}
diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 55f0546..7c24bce 100644
--- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -19,6 +19,7 @@
package io.druid.segment.incremental;
+import io.druid.java.util.common.StringUtils;
import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.RowIterator;
@@ -32,6 +33,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
public class IncrementalIndexAdapterTest
{
@@ -83,4 +85,81 @@ public class IncrementalIndexAdapterTest
Assert.assertEquals(0, (long) rowNums.get(0));
Assert.assertEquals(1, (long) rowNums.get(1));
}
+
+ @Test
+ public void testGetRowsIterableNoRollup() throws Exception
+ {
+ final long timestamp = System.currentTimeMillis();
+ IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null);
+ IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+ IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+ IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+
+
+ ArrayList<Integer> dim1Vals = new ArrayList<>();
+ for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
+ dim1Vals.add(((int[]) row.getDims()[0])[0]);
+ }
+ ArrayList<Integer> dim2Vals = new ArrayList<>();
+ for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
+ dim2Vals.add(((int[]) row.getDims()[1])[0]);
+ }
+
+ final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
+ toPersist1.getInterval(),
+ toPersist1,
+ INDEX_SPEC.getBitmapSerdeFactory()
+ .getBitmapFactory()
+ );
+
+ RowIterator rows = incrementalAdapter.getRows();
+ List<String> rowStrings = new ArrayList<>();
+ while (rows.moveToNext()) {
+ rowStrings.add(rows.getPointer().toString());
+ }
+
+ Function<Integer, String> getExpected = (rowNumber) -> {
+ if (rowNumber < 3) {
+ return StringUtils.format(
+ "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=1, dim2=2}, metrics={count=1}}",
+ rowNumber,
+ timestamp
+ );
+ } else {
+ return StringUtils.format(
+ "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=3, dim2=4}, metrics={count=1}}",
+ rowNumber,
+ timestamp
+ );
+ }
+ };
+
+
+ // without sorting, output would be
+ // RowPointer{indexNum=0, rowNumber=0, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=1, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=2, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=3, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=4, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=5, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+ // but with sorting, output should be
+ // RowPointer{indexNum=0, rowNumber=0, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=1, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=2, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=3, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=4, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+ // RowPointer{indexNum=0, rowNumber=5, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+
+ Assert.assertEquals(6, rowStrings.size());
+ for (int i = 0; i < 6; i++) {
+ if (i % 2 == 0) {
+ Assert.assertEquals(0, (long) dim1Vals.get(i));
+ Assert.assertEquals(0, (long) dim2Vals.get(i));
+ } else {
+ Assert.assertEquals(1, (long) dim1Vals.get(i));
+ Assert.assertEquals(1, (long) dim2Vals.get(i));
+ }
+ Assert.assertEquals(getExpected.apply(i), rowStrings.get(i));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org