You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/24 22:25:20 UTC

[druid] branch 0.18.1 updated: Optimize FileWriteOutBytes to avoid high system cpu usage (#9722) (#9770)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.1 by this push:
     new 29073c5  Optimize FileWriteOutBytes to avoid high system cpu usage (#9722) (#9770)
29073c5 is described below

commit 29073c56ec0758f6e43dd21e2f4b2e9976c8f3cc
Author: Suneet Saldanha <44...@users.noreply.github.com>
AuthorDate: Fri Apr 24 15:25:06 2020 -0700

    Optimize FileWriteOutBytes to avoid high system cpu usage (#9722) (#9770)
    
    * optimize FileWriteOutBytes to avoid high sys cpu
    
    * optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException
    
    * optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size
    
    * Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size"
    
    This reverts commit 965f7421
    
    * Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException"
    
    This reverts commit 149e08c0
    
    * optimize FileWriteOutBytes to avoid high sys cpu -- avoid IOEception never thrown check
    
    * Fix size counting to handle IOE in FileWriteOutBytes + tests
    
    * remove unused throws IOException in WriteOutBytes.size()
    
    * Remove redundant throws IOExcpetion clauses
    
    * Parameterize IndexMergeBenchmark
    
    Co-authored-by: huanghui.bigrey <hu...@bytedance.com>
    Co-authored-by: Suneet Saldanha <su...@imply.io>
    
    Co-authored-by: BIGrey <hu...@163.com>
    Co-authored-by: huanghui.bigrey <hu...@bytedance.com>
---
 .../benchmark/indexing/IndexMergeBenchmark.java    | 87 ++++++++++++--------
 .../druid/segment/data/ByteBufferWriter.java       |  2 +-
 .../EntireLayoutColumnarDoublesSerializer.java     |  2 +-
 .../data/EntireLayoutColumnarFloatsSerializer.java |  2 +-
 .../druid/segment/data/GenericIndexedWriter.java   |  6 +-
 .../segment/serde/ComplexColumnSerializer.java     |  2 +-
 ...argeColumnSupportedComplexColumnSerializer.java |  2 +-
 .../druid/segment/serde/MetaSerdeHelper.java       |  6 +-
 .../druid/segment/writeout/FileWriteOutBytes.java  | 12 ++-
 .../druid/segment/writeout/WriteOutBytes.java      |  2 +-
 .../segment/writeout/FileWriteOutBytesTest.java    | 95 ++++++++++++++++++++--
 11 files changed, 161 insertions(+), 57 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index 5f330ee..6b3b902 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -35,11 +35,13 @@ import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -66,6 +68,7 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class IndexMergeBenchmark
 {
+
   @Param({"5"})
   private int numSegments;
 
@@ -78,9 +81,13 @@ public class IndexMergeBenchmark
   @Param({"true", "false"})
   private boolean rollup;
 
+  @Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"})
+  private SegmentWriteOutType factoryType;
+
+
   private static final Logger log = new Logger(IndexMergeBenchmark.class);
   private static final int RNG_SEED = 9999;
-  private static final IndexMergerV9 INDEX_MERGER_V9;
+
   private static final IndexIO INDEX_IO;
   public static final ObjectMapper JSON_MAPPER;
 
@@ -91,6 +98,7 @@ public class IndexMergeBenchmark
   private List<QueryableIndex> indexesToMerge;
   private BenchmarkSchemaInfo schemaInfo;
   private File tmpDir;
+  private IndexMergerV9 indexMergerV9;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
@@ -99,23 +107,16 @@ public class IndexMergeBenchmark
     JSON_MAPPER.setInjectableValues(injectableValues);
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        new ColumnConfig()
-        {
-          @Override
-          public int columnCacheSizeBytes()
-          {
-            return 0;
-          }
-        }
+        () -> 0
     );
-    INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
   }
 
   @Setup
   public void setup() throws IOException
   {
-    log.info("SETUP CALLED AT " + System.currentTimeMillis());
 
+    log.info("SETUP CALLED AT " + System.currentTimeMillis());
+    indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType));
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
     indexesToMerge = new ArrayList<>();
@@ -143,7 +144,7 @@ public class IndexMergeBenchmark
       tmpDir = FileUtils.createTempDir();
       log.info("Using temp dir: " + tmpDir.getAbsolutePath());
 
-      File indexFile = INDEX_MERGER_V9.persist(
+      File indexFile = indexMergerV9.persist(
           incIndex,
           tmpDir,
           new IndexSpec(),
@@ -155,26 +156,6 @@ public class IndexMergeBenchmark
     }
   }
 
-  @TearDown
-  public void tearDown() throws IOException
-  {
-    FileUtils.deleteDirectory(tmpDir);
-  }
-
-  private IncrementalIndex makeIncIndex()
-  {
-    return new IncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-            .withMetrics(schemaInfo.getAggsArray())
-            .withRollup(rollup)
-            .build()
-        )
-        .setReportParseExceptions(false)
-        .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
-  }
-
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
@@ -186,7 +167,7 @@ public class IndexMergeBenchmark
     try {
       log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());
 
-      File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(
+      File mergedFile = indexMergerV9.mergeQueryableIndex(
           indexesToMerge,
           rollup,
           schemaInfo.getAggsArray(),
@@ -199,8 +180,46 @@ public class IndexMergeBenchmark
     }
     finally {
       tmpFile.delete();
+    }
+  }
+
+  @TearDown
+  public void tearDown() throws IOException
+  {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+
+  public enum SegmentWriteOutType
+  {
+    TMP_FILE,
+    OFF_HEAP,
+    ON_HEAP
+  }
 
+  private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type)
+  {
+    switch (type) {
+      case TMP_FILE:
+        return TmpFileSegmentWriteOutMediumFactory.instance();
+      case OFF_HEAP:
+        return OffHeapMemorySegmentWriteOutMediumFactory.instance();
+      case ON_HEAP:
+        return OnHeapMemorySegmentWriteOutMediumFactory.instance();
     }
+    throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type);
+  }
 
+  private IncrementalIndex makeIncIndex()
+  {
+    return new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withMetrics(schemaInfo.getAggsArray())
+                .withRollup(rollup)
+                .build()
+        )
+        .setReportParseExceptions(false)
+        .setMaxRowCount(rowsPerSegment)
+        .buildOnheap();
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
index ee1e781..bda8115 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java
@@ -63,7 +63,7 @@ public class ByteBufferWriter<T> implements Serializer
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     return headerOut.size() + valueOut.size();
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
index b330319..6265535 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
@@ -80,7 +80,7 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     return META_SERDE_HELPER.size(this) + valuesOut.size();
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
index 75b7290..a933265 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
@@ -81,7 +81,7 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     return META_SERDE_HELPER.size(this) + valuesOut.size();
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 95bc141..d4254ddd 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -296,7 +296,7 @@ public class GenericIndexedWriter<T> implements Serializer
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     if (requireMultipleFiles) {
       // for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file.
@@ -394,7 +394,7 @@ public class GenericIndexedWriter<T> implements Serializer
    *
    * @throws IOException
    */
-  private int bagSizePower() throws IOException
+  private int bagSizePower()
   {
     long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten;
 
@@ -421,7 +421,7 @@ public class GenericIndexedWriter<T> implements Serializer
    *
    * @throws IOException
    */
-  private boolean actuallyFits(int powerTwo) throws IOException
+  private boolean actuallyFits(int powerTwo)
   {
     long lastValueOffset = 0;
     long currentValueOffset = 0;
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java
index 1441c78..c7a2d59 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java
@@ -66,7 +66,7 @@ public class ComplexColumnSerializer<T> implements GenericColumnSerializer<T>
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     return writer.getSerializedSize();
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java
index bd68aef..c23c97c 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java
@@ -106,7 +106,7 @@ public class LargeColumnSupportedComplexColumnSerializer<T> implements GenericCo
   }
 
   @Override
-  public long getSerializedSize() throws IOException
+  public long getSerializedSize()
   {
     return writer.getSerializedSize();
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java
index ddda31f..113821c 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java
@@ -117,7 +117,7 @@ public final class MetaSerdeHelper<T>
 
   public interface FieldWriter<T>
   {
-    void writeTo(ByteBuffer buffer, T x) throws IOException;
+    void writeTo(ByteBuffer buffer, T x);
 
     int size(T x);
   }
@@ -125,10 +125,10 @@ public final class MetaSerdeHelper<T>
   @FunctionalInterface
   public interface IntFieldWriter<T> extends FieldWriter<T>
   {
-    int getField(T x) throws IOException;
+    int getField(T x);
 
     @Override
-    default void writeTo(ByteBuffer buffer, T x) throws IOException
+    default void writeTo(ByteBuffer buffer, T x)
     {
       buffer.putInt(getField(x));
     }
diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java
index 9ab579d..b12b15e 100644
--- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java
+++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java
@@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes
 {
   private final File file;
   private final FileChannel ch;
+  private long writeOutBytes;
 
   /** Purposely big-endian, for {@link #writeInt(int)} implementation */
   private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer
@@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes
   {
     this.file = file;
     this.ch = ch;
+    this.writeOutBytes = 0L;
   }
   
   private void flushIfNeeded(int bytesNeeded) throws IOException
@@ -66,6 +68,7 @@ final class FileWriteOutBytes extends WriteOutBytes
   {
     flushIfNeeded(1);
     buffer.put((byte) b);
+    writeOutBytes++;
   }
 
   @Override
@@ -73,6 +76,7 @@ final class FileWriteOutBytes extends WriteOutBytes
   {
     flushIfNeeded(Integer.BYTES);
     buffer.putInt(v);
+    writeOutBytes += Integer.BYTES;
   }
 
   @Override
@@ -85,6 +89,7 @@ final class FileWriteOutBytes extends WriteOutBytes
       try {
         src.limit(src.position() + buffer.capacity());
         buffer.put(src);
+        writeOutBytes += buffer.capacity();
         flush();
       }
       finally {
@@ -92,7 +97,9 @@ final class FileWriteOutBytes extends WriteOutBytes
         src.limit(srcLimit);
       }
     }
+    int remaining = src.remaining();
     buffer.put(src);
+    writeOutBytes += remaining;
     return len;
   }
 
@@ -103,10 +110,9 @@ final class FileWriteOutBytes extends WriteOutBytes
   }
 
   @Override
-  public long size() throws IOException
+  public long size()
   {
-    flush();
-    return ch.size();
+    return writeOutBytes;
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java
index f6a5e71..e1c2972 100644
--- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java
+++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java
@@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte
   /**
    * Returns the number of bytes written to this WriteOutBytes so far.
    */
-  public abstract long size() throws IOException;
+  public abstract long size();
 
   /**
    * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel.
diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java
index cfaa418..8501fa6 100644
--- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.writeout;
 
 import org.easymock.EasyMock;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,28 +37,106 @@ public class FileWriteOutBytesTest
   @Before
   public void setUp()
   {
-    this.mockFileChannel = EasyMock.mock(FileChannel.class);
-    this.fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel);
+    mockFileChannel = EasyMock.mock(FileChannel.class);
+    fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel);
   }
 
   @Test
-  public void testWrite4KBInts() throws IOException
+  public void write4KBIntsShouldNotFlush() throws IOException
   {
     // Write 4KB of ints and expect the write operation of the file channel will be triggered only once.
-    EasyMock.expect(this.mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+    EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
             .andAnswer(() -> {
               ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
               int remaining = buffer.remaining();
               buffer.position(remaining);
               return remaining;
             }).times(1);
-    EasyMock.replay(this.mockFileChannel);
+    EasyMock.replay(mockFileChannel);
     final int writeBytes = 4096;
     final int numOfInt = writeBytes / Integer.BYTES;
     for (int i = 0; i < numOfInt; i++) {
-      this.fileWriteOutBytes.writeInt(i);
+      fileWriteOutBytes.writeInt(i);
     }
-    this.fileWriteOutBytes.flush();
-    EasyMock.verify(this.mockFileChannel);
+    // no need to flush up to 4KB
+    // the first byte after 4KB will cause a flush
+    fileWriteOutBytes.write(1);
+    EasyMock.verify(mockFileChannel);
+  }
+
+  @Test
+  public void writeShouldIncrementSize() throws IOException
+  {
+    fileWriteOutBytes.write(1);
+    Assert.assertEquals(1, fileWriteOutBytes.size());
+  }
+
+  @Test
+  public void writeIntShouldIncrementSize() throws IOException
+  {
+    fileWriteOutBytes.writeInt(1);
+    Assert.assertEquals(4, fileWriteOutBytes.size());
+  }
+
+  @Test
+  public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException
+  {
+    EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+            .andAnswer(() -> {
+              ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
+              int remaining = buffer.remaining();
+              buffer.position(remaining);
+              return remaining;
+            }).times(1);
+    EasyMock.replay(mockFileChannel);
+    ByteBuffer src = ByteBuffer.allocate(4096 + 1);
+    fileWriteOutBytes.write(src);
+    Assert.assertEquals(src.capacity(), fileWriteOutBytes.size());
+    EasyMock.verify(mockFileChannel);
+  }
+
+  @Test
+  public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException
+  {
+    EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+            .andAnswer(() -> {
+              ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0];
+              int remaining = buffer.remaining();
+              buffer.position(remaining);
+              return remaining;
+            }).once();
+    EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+            .andThrow(new IOException())
+            .once();
+    EasyMock.replay(mockFileChannel);
+    ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1);
+    try {
+      fileWriteOutBytes.write(src);
+      Assert.fail("IOException should have been thrown.");
+    }
+    catch (IOException e) {
+      // The second invocation to flush bytes fails. So the size should count what has already been put successfully
+      Assert.assertEquals(4096 * 2, fileWriteOutBytes.size());
+    }
+  }
+
+  @Test
+  public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException
+  {
+    ByteBuffer src = ByteBuffer.allocate(4096);
+    fileWriteOutBytes.write(src);
+    Assert.assertEquals(src.capacity(), fileWriteOutBytes.size());
+  }
+  @Test
+  public void sizeDoesNotFlush() throws IOException
+  {
+    EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class)))
+            .andThrow(new AssertionError("file channel should not have been written to."));
+    EasyMock.replay(mockFileChannel);
+    long size = fileWriteOutBytes.size();
+    Assert.assertEquals(0, size);
+    fileWriteOutBytes.writeInt(10);
+    size = fileWriteOutBytes.size();
+    Assert.assertEquals(4, size);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org