You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/10/05 11:24:34 UTC

[GitHub] [druid] Eshcar commented on a change in pull request #10335: Configurable Index Type

Eshcar commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r499506440



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
##########
@@ -115,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
   public void testConvert()
   {
     KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+        null,

Review comment:
       a way to avoid the null (in multiple places) is to add another ctor that takes additional param while supporting prev ctor 

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       add comment about this being the line that sets the (configurable) I2 type

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -210,8 +211,8 @@ private InputSourceReader buildReader(
         .withRollup(dataSchema.getGranularitySpec().isRollup())
         .build();
 
-    return new IncrementalIndex.Builder().setIndexSchema(schema)
+    return (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder().setIndexSchema(schema)

Review comment:
       is this down-casting required?

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
##########
@@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask(
     boolean resetOffsetAutomatically = false;
     int maxRowsInMemory = 1000;
     final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
+        null,

Review comment:
       likewise null parameter

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
##########
@@ -302,11 +301,11 @@ private static IncrementalIndex makeIncrementalIndex(
         .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
         .build();
 
-    IncrementalIndex newIndex = new IncrementalIndex.Builder()
+    IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()

Review comment:
       add comment: this is the line that makes I2 configurable

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       likewise duplicate code

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
##########
@@ -193,7 +196,7 @@ public String toString()
            "maxRowsInMemory=" + getMaxRowsInMemory() +
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
-           ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
+           ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +

Review comment:
       nice. Looks like a change in the right direction

##########
File path: extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
##########
@@ -317,6 +318,32 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException
 
   }
 
+  @Test

Review comment:
       How is this test related to configurable index type?
   Please add documentation line explaining the test

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       having this code duplicated is weird
   Shouldn't all these XXXTuningConfig have some common parent with the shared code? 

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,129 +316,6 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder

Review comment:
       so this static class has now changed to an abstract class with different concrete builder classes - nice

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
           new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
       );
 
-      if (i == 0) {
-        aggOffsetInBuffer[i] = 0;
-      } else {
-        aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
-      }
+      aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;

Review comment:
       nice - simpler

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()

Review comment:
       this method should return IncrementalIndex
   Is this legal to change the signature ?

##########
File path: processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
##########
@@ -117,24 +120,24 @@ public String apply(DimensionSpec input)
         .withMinTimestamp(granTimeStart.getMillis())
         .build();
 
+
+    AppendableIndexBuilder indexBuilder;
+
     if (query.getContextValue("useOffheap", false)) {
-      index = new IncrementalIndex.Builder()
-          .setIndexSchema(indexSchema)
-          .setDeserializeComplexMetrics(false)
-          .setConcurrentEventAdd(true)
-          .setSortFacts(sortResults)
-          .setMaxRowCount(querySpecificConfig.getMaxResults())
-          .buildOffheap(bufferPool);
+      indexBuilder = new OffheapIncrementalIndex.Builder()
+          .setBufferPool(bufferPool);
     } else {
-      index = new IncrementalIndex.Builder()
-          .setIndexSchema(indexSchema)
-          .setDeserializeComplexMetrics(false)
-          .setConcurrentEventAdd(true)
-          .setSortFacts(sortResults)
-          .setMaxRowCount(querySpecificConfig.getMaxResults())
-          .buildOnheap();
+      indexBuilder = new OnheapIncrementalIndex.Builder();
     }
 
+    index = indexBuilder

Review comment:
       great. reducing duplication

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -434,4 +436,39 @@ public ColumnCapabilities getColumnCapabilities(String columnName)
     }
   }
 
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Override
+    protected OnheapIncrementalIndex buildInner()

Review comment:
       likewise - diff signature than parent class

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
           new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
       );
 
-      if (i == 0) {
-        aggOffsetInBuffer[i] = 0;
-      } else {
-        aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
-      }
+      aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
+      aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
     }
 
-    aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
-                                                                    - 1].getMaxIntermediateSizeWithNulls();
+    aggsTotalSize = aggsCurOffsetInBuffer;

Review comment:
       why is this correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org