You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/06 21:17:50 UTC

[GitHub] gianm closed pull request #6107: Order rows during incremental index persist when rollup is disabled.

gianm closed pull request #6107: Order rows during incremental index persist when rollup is disabled.
URL: https://github.com/apache/incubator-druid/pull/6107
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
index d50c1a04307..775e94b353d 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/datagen/BenchmarkSchemas.java
@@ -157,4 +157,54 @@
     );
     SCHEMA_MAP.put("simpleFloat", basicSchema);
   }
+
+  static { // schema with high opportunity for rollup
+    List<BenchmarkColumnSchema> rolloColumns = ImmutableList.of(
+        // dims
+        BenchmarkColumnSchema.makeEnumerated(
+            "dimEnumerated",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"),
+            Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
+        ),
+        BenchmarkColumnSchema.makeEnumerated(
+            "dimEnumerated2",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
+            Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
+        ),
+        BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 100, 2.0),
+        BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100),
+
+        // metrics
+        BenchmarkColumnSchema.makeZipf("metLongZipf", ValueType.LONG, true, 1, null, 0, 10000, 2.0),
+        BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
+        BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
+        BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.5)
+    );
+
+    List<AggregatorFactory> rolloSchemaIngestAggs = new ArrayList<>();
+    rolloSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
+    rolloSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
+    rolloSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
+    rolloSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
+    rolloSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
+    rolloSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
+
+    Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
+
+    BenchmarkSchemaInfo rolloSchema = new BenchmarkSchemaInfo(
+        rolloColumns,
+        rolloSchemaIngestAggs,
+        basicSchemaDataInterval,
+        true
+    );
+    SCHEMA_MAP.put("rollo", rolloSchema);
+  }
 }
diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
index 6d5e172fd51..c299b09eae7 100644
--- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -32,7 +32,6 @@
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexMergerV9;
 import io.druid.segment.IndexSpec;
-import io.druid.segment.column.ColumnConfig;
 import io.druid.segment.incremental.IncrementalIndex;
 import io.druid.segment.incremental.IncrementalIndexSchema;
 import io.druid.segment.serde.ComplexMetrics;
@@ -64,44 +63,38 @@
 @Measurement(iterations = 25)
 public class IndexPersistBenchmark
 {
-  @Param({"75000"})
-  private int rowsPerSegment;
-
-  @Param({"basic"})
-  private String schema;
-
-  @Param({"true", "false"})
-  private boolean rollup;
-
+  public static final ObjectMapper JSON_MAPPER;
   private static final Logger log = new Logger(IndexPersistBenchmark.class);
   private static final int RNG_SEED = 9999;
-
-  private IncrementalIndex incIndex;
-  private ArrayList<InputRow> rows;
-  private BenchmarkSchemaInfo schemaInfo;
-
-
   private static final IndexMergerV9 INDEX_MERGER_V9;
   private static final IndexIO INDEX_IO;
-  public static final ObjectMapper JSON_MAPPER;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
         OffHeapMemorySegmentWriteOutMediumFactory.instance(),
-        new ColumnConfig()
-        {
-          @Override
-          public int columnCacheSizeBytes()
-          {
-            return 0;
-          }
-        }
+        () -> 0
     );
     INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
   }
 
+  @Param({"75000"})
+  private int rowsPerSegment;
+
+  @Param({"rollo"})
+  private String schema;
+
+  @Param({"true", "false"})
+  private boolean rollup;
+
+  @Param({"none", "moderate", "high"})
+  private String rollupOpportunity;
+
+  private IncrementalIndex incIndex;
+  private ArrayList<InputRow> rows;
+  private BenchmarkSchemaInfo schemaInfo;
+
   @Setup
   public void setup()
   {
@@ -114,11 +107,23 @@ public void setup()
     rows = new ArrayList<InputRow>();
     schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
 
+    int valuesPerTimestamp = 1;
+    switch (rollupOpportunity) {
+      case "moderate":
+        valuesPerTimestamp = 1000;
+        break;
+      case "high":
+        valuesPerTimestamp = 10000;
+        break;
+
+    }
+
     BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
         schemaInfo.getColumnSchemas(),
         RNG_SEED,
-        schemaInfo.getDataInterval(),
-        rowsPerSegment
+        schemaInfo.getDataInterval().getStartMillis(),
+        valuesPerTimestamp,
+        1000.0
     );
 
     for (int i = 0; i < rowsPerSegment; i++) {
@@ -128,8 +133,6 @@ public void setup()
       }
       rows.add(row);
     }
-
-
   }
 
   @Setup(Level.Iteration)
@@ -154,9 +157,9 @@ private IncrementalIndex makeIncIndex()
     return new IncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
-            .withMetrics(schemaInfo.getAggsArray())
-            .withRollup(rollup)
-            .build()
+                .withMetrics(schemaInfo.getAggsArray())
+                .withRollup(rollup)
+                .build()
         )
         .setReportParseExceptions(false)
         .setMaxRowCount(rowsPerSegment)
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
index cdf8957b493..0faeaef578d 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java
@@ -80,6 +80,7 @@
 import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Deque;
 import java.util.Iterator;
@@ -94,6 +95,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 /**
  */
@@ -366,9 +368,24 @@ public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchem
     @VisibleForTesting
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      this.incrementalIndexSchema = new IncrementalIndexSchema.Builder()
-          .withMetrics(metrics)
-          .build();
+      return setSimpleTestingIndexSchema(null, metrics);
+    }
+
+
+    /**
+     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+     * would use it in production settings.
+     *
+     * @param metrics variable array of {@link AggregatorFactory} metrics
+     *
+     * @return this
+     */
+    @VisibleForTesting
+    public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+    {
+      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
       return this;
     }
 
@@ -1202,6 +1219,12 @@ private static boolean allNull(Object[] dims, int startPosition)
 
     Iterable<IncrementalIndexRow> keySet();
 
+    /**
+     * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator<IncrementalIndexRow>}
+     * @return
+     */
+    Iterable<IncrementalIndexRow> persistIterable();
+
     /**
      * @return the previous rowIndex associated with the specified key, or
      * {@link IncrementalIndexRow#EMPTY_ROW_INDEX} if there was no mapping for the key.
@@ -1289,6 +1312,13 @@ public long getMaxTimeMillis()
       return facts.keySet();
     }
 
+    @Override
+    public Iterable<IncrementalIndexRow> persistIterable()
+    {
+      // with rollup, facts are already pre-sorted so just return keyset
+      return keySet();
+    }
+
     @Override
     public int putIfAbsent(IncrementalIndexRow key, int rowIndex)
     {
@@ -1310,7 +1340,9 @@ public void clear()
     private final boolean sortFacts;
     private final ConcurrentMap<Long, Deque<IncrementalIndexRow>> facts;
 
-    public PlainFactsHolder(boolean sortFacts)
+    private final Comparator<IncrementalIndexRow> incrementalIndexRowComparator;
+
+    public PlainFactsHolder(boolean sortFacts, Comparator<IncrementalIndexRow> incrementalIndexRowComparator)
     {
       this.sortFacts = sortFacts;
       if (sortFacts) {
@@ -1318,6 +1350,7 @@ public PlainFactsHolder(boolean sortFacts)
       } else {
         this.facts = new ConcurrentHashMap<>();
       }
+      this.incrementalIndexRowComparator = incrementalIndexRowComparator;
     }
 
     @Override
@@ -1351,10 +1384,10 @@ public long getMaxTimeMillis()
     public Iterator<IncrementalIndexRow> iterator(boolean descending)
     {
       if (descending && sortFacts) {
-        return concat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
+        return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
                 .descendingMap().values(), true).iterator();
       }
-      return concat(facts.values(), false).iterator();
+      return timeOrderedConcat(facts.values(), false).iterator();
     }
 
     @Override
@@ -1363,10 +1396,10 @@ public long getMaxTimeMillis()
       ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>> subMap =
           ((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts).subMap(timeStart, timeEnd);
       final Map<Long, Deque<IncrementalIndexRow>> rangeMap = descending ? subMap.descendingMap() : subMap;
-      return concat(rangeMap.values(), descending);
+      return timeOrderedConcat(rangeMap.values(), descending);
     }
 
-    private Iterable<IncrementalIndexRow> concat(
+    private Iterable<IncrementalIndexRow> timeOrderedConcat(
         final Iterable<Deque<IncrementalIndexRow>> iterable,
         final boolean descending
     )
@@ -1379,10 +1412,25 @@ public long getMaxTimeMillis()
       );
     }
 
+    private Stream<IncrementalIndexRow> timeAndDimsOrderedConcat(
+        final Collection<Deque<IncrementalIndexRow>> rowGroups
+    )
+    {
+      return rowGroups.stream()
+                      .flatMap(Collection::stream)
+                      .sorted(incrementalIndexRowComparator);
+    }
+
     @Override
     public Iterable<IncrementalIndexRow> keySet()
     {
-      return concat(facts.values(), false);
+      return timeOrderedConcat(facts.values(), false);
+    }
+
+    @Override
+    public Iterable<IncrementalIndexRow> persistIterable()
+    {
+      return () -> timeAndDimsOrderedConcat(facts.values()).iterator();
     }
 
     @Override
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
index 40ad4ecbe7b..59856fab3ba 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java
@@ -93,7 +93,7 @@ private void processRows(
   )
   {
     int rowNum = 0;
-    for (IncrementalIndexRow row : index.getFacts().keySet()) {
+    for (IncrementalIndexRow row : index.getFacts().persistIterable()) {
       final Object[] dims = row.getDims();
 
       for (IncrementalIndex.DimensionDesc dimension : dimensions) {
diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
index 803460cb8ad..f3634c2882f 100644
--- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
+++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRowIterator.java
@@ -50,7 +50,7 @@
 
   IncrementalIndexRowIterator(IncrementalIndex<?> incrementalIndex)
   {
-    this.timeAndDimsIterator = incrementalIndex.getFacts().keySet().iterator();
+    this.timeAndDimsIterator = incrementalIndex.getFacts().persistIterable().iterator();
     this.currentRowPointer = makeRowPointer(incrementalIndex, currentRowHolder, currentRowNumCounter);
     // markedRowPointer doesn't actually need to be a RowPointer (just a TimeAndDimsPointer), but we create a RowPointer
     // in order to reuse the makeRowPointer() method. Passing a dummy RowNumCounter.
diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
index 472adfdc673..198137bb8ba 100644
--- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -84,7 +84,7 @@
     this.bufferPool = bufferPool;
 
     this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
-                                                   : new PlainFactsHolder(sortFacts);
+                                                   : new PlainFactsHolder(sortFacts, dimsComparator());
 
     //check that stupid pool gives buffers that can hold at least one row's aggregators
     ResourceHolder<ByteBuffer> bb = bufferPool.take();
diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
index 0b86dd05ce7..c08f57999b8 100644
--- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -79,7 +79,7 @@
     this.maxRowCount = maxRowCount;
     this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
     this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
-                                                   : new PlainFactsHolder(sortFacts);
+                                                   : new PlainFactsHolder(sortFacts, dimsComparator());
     maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema);
   }
 
diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
index 0d683d45ec2..0ee0d071cc8 100644
--- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java
@@ -240,7 +240,7 @@ public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregato
     }
 
     return new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(aggregatorFactories)
+        .setSimpleTestingIndexSchema(false, aggregatorFactories)
         .setMaxRowCount(1000000)
         .buildOnheap();
   }
diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 55f05460b3e..7c24bcef029 100644
--- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -19,6 +19,7 @@
 
 package io.druid.segment.incremental;
 
+import io.druid.java.util.common.StringUtils;
 import io.druid.segment.IndexSpec;
 import io.druid.segment.IndexableAdapter;
 import io.druid.segment.RowIterator;
@@ -32,6 +33,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 public class IncrementalIndexAdapterTest
 {
@@ -83,4 +85,81 @@ public void testGetRowsIterable() throws Exception
     Assert.assertEquals(0, (long) rowNums.get(0));
     Assert.assertEquals(1, (long) rowNums.get(1));
   }
+
+  @Test
+  public void testGetRowsIterableNoRollup() throws Exception
+  {
+    final long timestamp = System.currentTimeMillis();
+    IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null);
+    IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+    IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+    IncrementalIndexTest.populateIndex(timestamp, toPersist1);
+
+
+    ArrayList<Integer> dim1Vals = new ArrayList<>();
+    for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
+      dim1Vals.add(((int[]) row.getDims()[0])[0]);
+    }
+    ArrayList<Integer> dim2Vals = new ArrayList<>();
+    for (IncrementalIndexRow row : toPersist1.getFacts().keySet()) {
+      dim2Vals.add(((int[]) row.getDims()[1])[0]);
+    }
+
+    final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
+        toPersist1.getInterval(),
+        toPersist1,
+        INDEX_SPEC.getBitmapSerdeFactory()
+                  .getBitmapFactory()
+    );
+
+    RowIterator rows = incrementalAdapter.getRows();
+    List<String> rowStrings = new ArrayList<>();
+    while (rows.moveToNext()) {
+      rowStrings.add(rows.getPointer().toString());
+    }
+
+    Function<Integer, String> getExpected = (rowNumber) -> {
+      if (rowNumber < 3) {
+        return StringUtils.format(
+            "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=1, dim2=2}, metrics={count=1}}",
+            rowNumber,
+            timestamp
+        );
+      } else {
+        return StringUtils.format(
+            "RowPointer{indexNum=0, rowNumber=%s, timestamp=%s, dimensions={dim1=3, dim2=4}, metrics={count=1}}",
+            rowNumber,
+            timestamp
+        );
+      }
+    };
+
+
+    // without sorting, output would be
+    //    RowPointer{indexNum=0, rowNumber=0, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=1, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=2, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=3, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=4, timestamp=1533347274588, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=5, timestamp=1533347274588, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    // but with sorting, output should be
+    //    RowPointer{indexNum=0, rowNumber=0, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=1, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=2, timestamp=1533347361396, dimensions={dim1=1, dim2=2}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=3, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=4, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+    //    RowPointer{indexNum=0, rowNumber=5, timestamp=1533347361396, dimensions={dim1=3, dim2=4}, metrics={count=1}}
+
+    Assert.assertEquals(6, rowStrings.size());
+    for (int i = 0; i < 6; i++) {
+      if (i % 2 == 0) {
+        Assert.assertEquals(0, (long) dim1Vals.get(i));
+        Assert.assertEquals(0, (long) dim2Vals.get(i));
+      } else {
+        Assert.assertEquals(1, (long) dim1Vals.get(i));
+        Assert.assertEquals(1, (long) dim2Vals.get(i));
+      }
+      Assert.assertEquals(getExpected.apply(i), rowStrings.get(i));
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org