You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/08/31 08:02:51 UTC

[GitHub] [druid] liran-funaro opened a new pull request #10335: [WIP] Design for a Configurable Index Type

liran-funaro opened a new pull request #10335:
URL: https://github.com/apache/druid/pull/10335


   Fixes #10321.
   This is a WIP for #10321.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501822993



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > We should look at incrementally refactoring HadoopTuningConfig.getRowFlushBoundary into getMaxRowsInMemory so that it could be moved into TuningConfig as well.
   
   I opened (earlier) #10478 for that. It is short.
   If you prefer, I can apply your comments from this PR to #10478 and merge it before this one.
   It address both `HadoopTuningConfig.getRowFlushBoundary` and the `TuningConfig` interface refactoring.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
a2l007 commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501740100



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       @liran-funaro Could you please address this?

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > I was wondering why they are in `AppenderatorConfig` in the first place.
   
   I'm not sure. Maybe @jihoonson can help answer this.
   
   
   > So I think it is best that as much of the common API as possible will be in `TuningConfig`.
   > And these methods are indeed in common.
   
   We should look at incrementally refactoring `HadoopTuningConfig.getRowFlushBoundary` into `getMaxRowsInMemory` so that it could be moved into TuningConfig as well.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       Sounds reasonable. Please create a github issue for this so that we are tracking the incremental refactor effort.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501517293



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")
+      );
+    }
+  }
+
+  public static class Spec implements AppendableIndexSpec, Supplier<ByteBuffer>
+  {
+    public static final String TYPE = "offheap";
+    static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+    static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+    final int bufferSize;
+    final int cacheSize;
+    final NonBlockingPool<ByteBuffer> bufferPool;
+
+    @JsonCreator
+    public Spec(
+        final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+        final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+    )
+    {
+      this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE;
+      this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE;
+      this.bufferPool = new StupidPool<>(
+          "Offheap incremental-index buffer pool",
+          this,
+          0,
+          this.cacheSize / this.bufferSize

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-709328478


   @a2l007 I addressed all your comments. Is there anything else I need to do to move this PR forward?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] Eshcar commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
Eshcar commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r499506440



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
##########
@@ -115,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
   public void testConvert()
   {
     KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+        null,

Review comment:
       a way to avoid the null (in multiple places) is to add another ctor that takes additional param while supporting prev ctor 

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       add comment about this being the line that sets the (configurable) I2 type

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -210,8 +211,8 @@ private InputSourceReader buildReader(
         .withRollup(dataSchema.getGranularitySpec().isRollup())
         .build();
 
-    return new IncrementalIndex.Builder().setIndexSchema(schema)
+    return (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder().setIndexSchema(schema)

Review comment:
       is this down-casting required?

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
##########
@@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask(
     boolean resetOffsetAutomatically = false;
     int maxRowsInMemory = 1000;
     final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
+        null,

Review comment:
       likewise null parameter

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
##########
@@ -302,11 +301,11 @@ private static IncrementalIndex makeIncrementalIndex(
         .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
         .build();
 
-    IncrementalIndex newIndex = new IncrementalIndex.Builder()
+    IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()

Review comment:
       add comment: this is the line that makes I2 configurable

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       likewise duplicate code

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
##########
@@ -193,7 +196,7 @@ public String toString()
            "maxRowsInMemory=" + getMaxRowsInMemory() +
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
-           ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
+           ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +

Review comment:
       nice. Looks like a change in the right direction

##########
File path: extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
##########
@@ -317,6 +318,32 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException
 
   }
 
+  @Test

Review comment:
       How is this test related to configurable index type?
   Please add documentation line explaining the test

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       having this code duplicated is weird
   Shouldn't all these XXXTuningConfig have some common parent with the shared code? 

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,129 +316,6 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder

Review comment:
       so this static class has now changed to an abstract class with different concrete builder classes - nice

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
           new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
       );
 
-      if (i == 0) {
-        aggOffsetInBuffer[i] = 0;
-      } else {
-        aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
-      }
+      aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;

Review comment:
       nice - simpler

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()

Review comment:
       this method should return IncrementalIndex
   Is this legal to change the signature ?

##########
File path: processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
##########
@@ -117,24 +120,24 @@ public String apply(DimensionSpec input)
         .withMinTimestamp(granTimeStart.getMillis())
         .build();
 
+
+    AppendableIndexBuilder indexBuilder;
+
     if (query.getContextValue("useOffheap", false)) {
-      index = new IncrementalIndex.Builder()
-          .setIndexSchema(indexSchema)
-          .setDeserializeComplexMetrics(false)
-          .setConcurrentEventAdd(true)
-          .setSortFacts(sortResults)
-          .setMaxRowCount(querySpecificConfig.getMaxResults())
-          .buildOffheap(bufferPool);
+      indexBuilder = new OffheapIncrementalIndex.Builder()
+          .setBufferPool(bufferPool);
     } else {
-      index = new IncrementalIndex.Builder()
-          .setIndexSchema(indexSchema)
-          .setDeserializeComplexMetrics(false)
-          .setConcurrentEventAdd(true)
-          .setSortFacts(sortResults)
-          .setMaxRowCount(querySpecificConfig.getMaxResults())
-          .buildOnheap();
+      indexBuilder = new OnheapIncrementalIndex.Builder();
     }
 
+    index = indexBuilder

Review comment:
       great. reducing duplication

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
##########
@@ -434,4 +436,39 @@ public ColumnCapabilities getColumnCapabilities(String columnName)
     }
   }
 
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Override
+    protected OnheapIncrementalIndex buildInner()

Review comment:
       likewise - diff signature than parent class

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
           new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
       );
 
-      if (i == 0) {
-        aggOffsetInBuffer[i] = 0;
-      } else {
-        aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
-      }
+      aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
+      aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
     }
 
-    aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
-                                                                    - 1].getMaxIntermediateSizeWithNulls();
+    aggsTotalSize = aggsCurOffsetInBuffer;

Review comment:
       why is this correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501210295



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       I leave it on to avoid changing 100+ lines of code.
   This can be removed later in a refactor PR.
   Do you think I should remove it in this PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r500316131



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -129,15 +136,11 @@ public FactsHolder getFacts()
           new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
       );
 
-      if (i == 0) {
-        aggOffsetInBuffer[i] = 0;
-      } else {
-        aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
-      }
+      aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
+      aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
     }
 
-    aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
-                                                                    - 1].getMaxIntermediateSizeWithNulls();
+    aggsTotalSize = aggsCurOffsetInBuffer;

Review comment:
       After the last aggregator, the next offset in the buffer is the total size of the buffer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501517936



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+    }
+  }
+
+  public final IncrementalIndex build()

Review comment:
       Changed to DEBUG.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r508821432



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
##########
@@ -84,10 +87,11 @@ public SeekableStreamIndexTaskTuningConfig(
     // Cannot be a static because default basePersistDirectory is unique per-instance
     final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
 
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
     this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
-    // initializing this to 0, it will be lazily initialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    /** initializing this to 0, it will be lazily initialized to a value
+     * @see #getMaxBytesInMemoryOrDefault() */

Review comment:
       Same here.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,9 +1267,10 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
-      // initializing this to 0, it will be lazily initialized to a value
-      // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+      /** initializing this to 0, it will be lazily initialized to a value
+       * @see #getMaxBytesInMemoryOrDefault() */

Review comment:
       nit: we don't use multi-line comments.

##########
File path: docs/ingestion/index.md
##########
@@ -737,3 +741,11 @@ The `indexSpec` object can include the following properties:
 
 Beyond these properties, each ingestion method has its own specific tuning properties. See the documentation for each
 [ingestion method](#ingestion-methods) for details.
+
+#### `appendableIndexSpec`
+
+|Field|Description|Default|
+|-----|-----------|-------|
+|type|Each in-memory index has its own tuning type code. You must specify the type code that matches your in-memory index. Common options are `onheap`, and `offheap`.|`onheap`|
+
+Beyond these properties, each in-memory index has its own specific tuning properties.

Review comment:
       Do you think if users want to change this config ever for now? I remember @gianm mentioned that the offheap incremental index has some performance issue, which is why we don't use it for indexing. If you think this knob is useful for users, please add more details how each index type is different and when it's recommended to use what. Otherwise, I would suggest not documenting this know at least for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501523561



##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Added. See `AppendableIndexSpecTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501213262



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use

Review comment:
       Nice catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-705500210


   This pull request **introduces 1 alert** when merging 98788ec97fc49bf2ee8ab97a9348b6a2f9b2684c into 0aa2a8e2c641aa8eb8722b76b205f70f7bbff8cf - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-7caeff7ba7f5df000a9f4d1986554e2b0d9723e5)
   
   **new alerts:**
   
   * 1 for Inconsistent equals and hashCode


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501517293



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")
+      );
+    }
+  }
+
+  public static class Spec implements AppendableIndexSpec, Supplier<ByteBuffer>
+  {
+    public static final String TYPE = "offheap";
+    static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+    static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+    final int bufferSize;
+    final int cacheSize;
+    final NonBlockingPool<ByteBuffer> bufferPool;
+
+    @JsonCreator
+    public Spec(
+        final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+        final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+    )
+    {
+      this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE;
+      this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE;
+      this.bufferPool = new StupidPool<>(
+          "Offheap incremental-index buffer pool",
+          this,
+          0,
+          this.cacheSize / this.bufferSize

Review comment:
       Removed

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use

Review comment:
       Fixed

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Fixed

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
##########
@@ -132,6 +136,7 @@ public RealtimeTuningConfig(
       @JsonProperty("dedupColumn") @Nullable String dedupColumn
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Fixed

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+    }
+  }
+
+  public final IncrementalIndex build()

Review comment:
       Changed to DEBUG.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
        After reading your comment about the off-heap builder, I think it is inevitable to leave this builder and add back the `buildOffheap()` method as well.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
##########
@@ -17,25 +17,16 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
 
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+public interface AppendableIndexSpec

Review comment:
       Added

##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Added. See `AppendableIndexSpecTest`.

##########
File path: extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
##########
@@ -317,6 +318,32 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException
 
   }
 
+  @Test

Review comment:
       Added

##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Added. See `AppendableIndexSpecTest`.

##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       This is tested as part of the separate `TuningConfig` tests:
   * `HadoopTuningConfigTest`
   * `ParallelIndexTuningConfigTest`
   * `KafkaIndexTuningConfigTest`
   * `KafkaSupervisorTuningConfigTest`
   * `KinesisIndexTaskTuningConfigTest`
   * `KinesisSupervisorTuningConfigTest`
   * `RealtimeTuningConfigTest`

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       I'm unsure why this line needs to be documented. It is no different from any of the other tuning configurations in this constructor.
   `AppendableIndexSpec` is documented, so it is self-explanatory.

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > We should look at incrementally refactoring HadoopTuningConfig.getRowFlushBoundary into getMaxRowsInMemory so that it could be moved into TuningConfig as well.
   
   I opened (earlier) #10478 for that. It is short.
   If you prefer, I can apply your comments from this PR to #10478 and merge it before this one.
   It address both `HadoopTuningConfig.getRowFlushBoundary` and the `TuningConfig` interface refactoring.

##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
##########
@@ -115,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
   public void testConvert()
   {
     KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+        null,

Review comment:
       I think it is best to avoid a new constructor here.
   Otherwise, each new parameter will incur a new constructor.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
##########
@@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask(
     boolean resetOffsetAutomatically = false;
     int maxRowsInMemory = 1000;
     final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
+        null,

Review comment:
       See above.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       I agree. But that will require a refactor to unify some of the `TuningConfig` implementations to one common (abstract?) implementation.
   This can be done before or after this PR.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       See above.

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
##########
@@ -302,11 +301,11 @@ private static IncrementalIndex makeIncrementalIndex(
         .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
         .build();
 
-    IncrementalIndex newIndex = new IncrementalIndex.Builder()
+    IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()

Review comment:
       Done

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > We should look at incrementally refactoring HadoopTuningConfig.getRowFlushBoundary into getMaxRowsInMemory so that it could be moved into TuningConfig as well.
   
   I opened (earlier) #10478 for that. It is short.
   If you prefer, I can apply your comments from this PR to #10478 and merge it before this one.
   It address both `HadoopTuningConfig.getRowFlushBoundary` and the `TuningConfig` interface refactoring.
   
   EDIT: I already applied your comments there.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       Done. See #10494




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-705564572






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501212288



##########
File path: processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
##########
@@ -117,24 +120,24 @@ public String apply(DimensionSpec input)
         .withMinTimestamp(granTimeStart.getMillis())
         .build();
 
+
+    AppendableIndexBuilder indexBuilder;

Review comment:
       Yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501829305



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
##########
@@ -115,6 +115,7 @@ public void testSerdeWithNonDefaults() throws Exception
   public void testConvert()
   {
     KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+        null,

Review comment:
       I think it is best to avoid a new constructor here.
   Otherwise, each new parameter will incur a new constructor.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
##########
@@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask(
     boolean resetOffsetAutomatically = false;
     int maxRowsInMemory = 1000;
     final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
+        null,

Review comment:
       See above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
a2l007 commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r499697190



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+    }
+  }
+
+  public final IncrementalIndex build()

Review comment:
       Would it make sense to make this abstract instead of having a separate `buildInner()` method ? We could leave the validation at the implementation level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501831217



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       I agree. But that will require a refactor to unify some of the `TuningConfig` implementations to one common (abstract?) implementation.
   This can be done before or after this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501819535



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       I'm unsure why this line needs to be documented. It is no different from any of the other tuning configurations in this constructor.
   `AppendableIndexSpec` is documented, so it is self-explanatory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501600513



##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       This is tested as part of the separate `TuningConfig` tests:
   * `HadoopTuningConfigTest`
   * `ParallelIndexTuningConfigTest`
   * `KafkaIndexTuningConfigTest`
   * `KafkaSupervisorTuningConfigTest`
   * `KinesisIndexTaskTuningConfigTest`
   * `KinesisSupervisorTuningConfigTest`
   * `RealtimeTuningConfigTest`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501222707



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
        After reading your comment about the off-heap builder, I think it is inevitable to leave this builder and add back the `buildOffheap()` method as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r509187892



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,9 +1267,10 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
-      // initializing this to 0, it will be lazily initialized to a value
-      // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+      /** initializing this to 0, it will be lazily initialized to a value
+       * @see #getMaxBytesInMemoryOrDefault() */

Review comment:
       Fixed.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
##########
@@ -84,10 +87,11 @@ public SeekableStreamIndexTaskTuningConfig(
     // Cannot be a static because default basePersistDirectory is unique per-instance
     final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
 
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
     this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
-    // initializing this to 0, it will be lazily initialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    /** initializing this to 0, it will be lazily initialized to a value
+     * @see #getMaxBytesInMemoryOrDefault() */

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
a2l007 commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501099215



##########
File path: processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
##########
@@ -117,24 +120,24 @@ public String apply(DimensionSpec input)
         .withMinTimestamp(granTimeStart.getMillis())
         .build();
 
+
+    AppendableIndexBuilder indexBuilder;

Review comment:
       Can this be final?

##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Could you please add a test for this module?

##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     // initializing this to 0, it will be lazily initialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Comment needs to be updated

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
##########
@@ -93,6 +96,7 @@ public RealtimeAppenderatorTuningConfig(
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     // initializing this to 0, it will be lazily intialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Comment needs to be modified.

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
##########
@@ -132,6 +136,7 @@ public RealtimeTuningConfig(
       @JsonProperty("dedupColumn") @Nullable String dedupColumn
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       This comment is no longer valid. Please update accordingly.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
##########
@@ -17,25 +17,16 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
 
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+public interface AppendableIndexSpec

Review comment:
       Please add javadocs.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       Do we still need this method?

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")

Review comment:
       Isn't validate() already checking this?

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       Why we do we need to move these out of AppenderatorConfig?

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use

Review comment:
       nit: typo for incremental

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Comment needs to be modified.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+    }
+  }
+
+  public final IncrementalIndex build()

Review comment:
       buildInner should be okay. 
   It might be better to switch the log level to DEBUG though.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")
+      );
+    }
+  }
+
+  public static class Spec implements AppendableIndexSpec, Supplier<ByteBuffer>
+  {
+    public static final String TYPE = "offheap";
+    static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+    static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+    final int bufferSize;
+    final int cacheSize;
+    final NonBlockingPool<ByteBuffer> bufferPool;
+
+    @JsonCreator
+    public Spec(
+        final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+        final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+    )
+    {
+      this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE;
+      this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE;
+      this.bufferPool = new StupidPool<>(
+          "Offheap incremental-index buffer pool",
+          this,
+          0,
+          this.cacheSize / this.bufferSize

Review comment:
       The changes in this class apart from conforming to the AppendableIndexSpec interface might be out of scope of this PR and it might require its own additional tests. Would it be make sense to propose these changes in an incremental PR once the AppendableIndexSpec changes are merged?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
a2l007 commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r505591694



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
##########
@@ -281,7 +292,8 @@ public boolean equals(Object o)
       return false;
     }
     SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
-    return maxRowsInMemory == that.maxRowsInMemory &&
+    return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+           maxRowsInMemory == that.maxRowsInMemory &&

Review comment:
       Could you please write an EqualsVerifier test for this. That should take care of the travis failures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-712906532


   @jihoonson Do you have any more comments or can we proceed to merge this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-705564572


   Thanks, @a2l007 
   I addressed all your comments in the codes, or with follow-up questions.
   Regarding the integration tests, I added tests to all the `TuningConfig` tests that validate the `AppendableIndexSpec` within `TuningConfig`.
   I think it is sufficient. Do you agree?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #10335:
URL: https://github.com/apache/druid/pull/10335


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501848342



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       Done. See #10494




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r509189589



##########
File path: docs/ingestion/index.md
##########
@@ -737,3 +741,11 @@ The `indexSpec` object can include the following properties:
 
 Beyond these properties, each ingestion method has its own specific tuning properties. See the documentation for each
 [ingestion method](#ingestion-methods) for details.
+
+#### `appendableIndexSpec`
+
+|Field|Description|Default|
+|-----|-----------|-------|
+|type|Each in-memory index has its own tuning type code. You must specify the type code that matches your in-memory index. Common options are `onheap`, and `offheap`.|`onheap`|
+
+Beyond these properties, each in-memory index has its own specific tuning properties.

Review comment:
       Actually, by @a2l007 suggestion, I deferred the support for offheap ingestion to a different PR.
   So this documentation should not include the `offheap` option anyway.
   I'll removed this documentation for now.

##########
File path: docs/ingestion/index.md
##########
@@ -737,3 +741,11 @@ The `indexSpec` object can include the following properties:
 
 Beyond these properties, each ingestion method has its own specific tuning properties. See the documentation for each
 [ingestion method](#ingestion-methods) for details.
+
+#### `appendableIndexSpec`
+
+|Field|Description|Default|
+|-----|-----------|-------|
+|type|Each in-memory index has its own tuning type code. You must specify the type code that matches your in-memory index. Common options are `onheap`, and `offheap`.|`onheap`|
+
+Beyond these properties, each in-memory index has its own specific tuning properties.

Review comment:
       Actually, by @a2l007 suggestion, I deferred the support for offheap ingestion to a different PR.
   So this documentation should not include the `offheap` option anyway.
   I'll remove this documentation for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] a2l007 commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
a2l007 commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501740100



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
##########
@@ -140,6 +144,7 @@ public HadoopTuningConfig(
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       @liran-funaro Could you please address this?

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > I was wondering why they are in `AppenderatorConfig` in the first place.
   
   I'm not sure. Maybe @jihoonson can help answer this.
   
   
   > So I think it is best that as much of the common API as possible will be in `TuningConfig`.
   > And these methods are indeed in common.
   
   We should look at incrementally refactoring `HadoopTuningConfig.getRowFlushBoundary` into `getMaxRowsInMemory` so that it could be moved into TuningConfig as well.

##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
       Sounds reasonable. Please create a github issue for this so that we are tracking the incremental refactor effort.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-715448671


   @liran-funaro we usually wait for some time before merging a PR for other committers if there is anyone who wants to review as well. I will merge this PR today unless there is someone else who has more comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501221001



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")
+      );
+    }
+  }
+
+  public static class Spec implements AppendableIndexSpec, Supplier<ByteBuffer>
+  {
+    public static final String TYPE = "offheap";
+    static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+    static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+    final int bufferSize;
+    final int cacheSize;
+    final NonBlockingPool<ByteBuffer> bufferPool;
+
+    @JsonCreator
+    public Spec(
+        final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+        final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+    )
+    {
+      this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE;
+      this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE;
+      this.bufferPool = new StupidPool<>(
+          "Offheap incremental-index buffer pool",
+          this,
+          0,
+          this.cacheSize / this.bufferSize

Review comment:
       Yes. That is possible. I will separate this part into a PR that will follow this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501522998



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
##########
@@ -17,25 +17,16 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
 
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+@UnstableApi
+public interface AppendableIndexSpec

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501218301



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")

Review comment:
       It does, but without this, Intellij warns about "Argument might be null".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-705500210


   This pull request **introduces 1 alert** when merging 98788ec97fc49bf2ee8ab97a9348b6a2f9b2684c into 0aa2a8e2c641aa8eb8722b76b205f70f7bbff8cf - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-7caeff7ba7f5df000a9f4d1986554e2b0d9723e5)
   
   **new alerts:**
   
   * 1 for Inconsistent equals and hashCode


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501831424



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;

Review comment:
       See above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501216090



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       We don't have to. I was wondering why they are in `AppenderatorConfig` in the first place.
   The only reasoning for why `AppenderatorConfig` exists is because `HadoopTuningConfig` doesn't share all of the `TuningConfig` API with the rest of the "appenderators".
   So I think it is best that as much of the common API as possible will be in `TuningConfig`.
   And these methods are indeed in common.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro edited a comment on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro edited a comment on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-713496656


   @jihoonson Thanks. I addressed all your comments.
   Please let me know if you see any other required modifications, or that we could proceed to merge this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-715372075


   @jihoonson Do we need more approval before merge?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-714791644


   Passed CI.
   Thanks @jihoonson and @a2l007
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501835878



##########
File path: indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
##########
@@ -302,11 +301,11 @@ private static IncrementalIndex makeIncrementalIndex(
         .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
         .build();
 
-    IncrementalIndex newIndex = new IncrementalIndex.Builder()
+    IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r505752280



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       Thanks, @jihoonson. I will revert this API change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-713496656


   @jihoonson Thanks. I address all your comments.
   Please let me know if you see any other required modifications, or that we could proceed to merge this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r500312831



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+    }
+  }
+
+  public final IncrementalIndex build()

Review comment:
       You're right, it would be more elegant without this `buildInner()` method.
   But I was trying to avoid code duplication: having to call `validate()` and `log.info()` in each `build()` implementation.
   It is marked as `final` exactly to enforce this behavior.
   I could rename it to something else, `instantiate()` for example.
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r505740755



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       Oops, sorry. I missed the previous ping. 
   
   > I was wondering why they are in `AppenderatorConfig` in the first place.
   
   `indexSpecForIntermediatePersists` was added in #7919. I'm not aware of the exact reason, but it seems not bad to have it only in `AppenderatorConfig` because that parameter is coupled with how `Appenderator` spills intermediate segments. Hadoop task is sort of special. I think it was implemented before we added `Appenderator` (or at least before `AppenderatorDriver`), and we could implement `Appenderator` in a more structured way based on the lessons we learned from Hadoop task. So, it works similar to other tasks using `Appenderator`, but is not exactly same. 
   
   Regarding the interface change, even though we currently have only the tasks which have the similar spilling mechanism, but it might not be true in the future. So, I would say it would be better to not modify the interface unless you have to.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501822993



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       > We should look at incrementally refactoring HadoopTuningConfig.getRowFlushBoundary into getMaxRowsInMemory so that it could be moved into TuningConfig as well.
   
   I opened (earlier) #10478 for that. It is short.
   If you prefer, I can apply your comments from this PR to #10478 and merge it before this one.
   It address both `HadoopTuningConfig.getRowFlushBoundary` and the `TuningConfig` interface refactoring.
   
   EDIT: I already applied your comments there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-709566461


   @a2l007 Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-715652644


   @liran-funaro merged. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r500317091



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
##########
@@ -346,4 +349,99 @@ public void close()
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()

Review comment:
       It is legal for an implementation to increase the requirement, but not to lower it.
   As long as it conforms with the interface's contract. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r505697287



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
##########
@@ -281,7 +292,8 @@ public boolean equals(Object o)
       return false;
     }
     SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
-    return maxRowsInMemory == that.maxRowsInMemory &&
+    return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+           maxRowsInMemory == that.maxRowsInMemory &&

Review comment:
       @a2l007 Thanks for the tip. I wasn't aware of `EqualsVerifier`.
   I added it and now it passes the tests but fails some integration tests that passed before.
   I suspect there is an issue with the integration tests because sometimes the master branch also fails in these.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501222707



##########
File path: processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
##########
@@ -319,126 +316,62 @@ protected IncrementalIndex(
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()

Review comment:
        After reading your comment about the off-heap builder, I think it is inevitable to leave this builder and add back the `buildOffheap()` method as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501529552



##########
File path: extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
##########
@@ -317,6 +318,32 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException
 
   }
 
+  @Test

Review comment:
       Added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on pull request #10335:
URL: https://github.com/apache/druid/pull/10335#issuecomment-705693671


   > Could you please fix the CI failures as well? Looks like there is coverage failure.
   
   The coverage gap comes from `ParallelIndexTuningConfig.equals()` and it was there before this PR. I just added another comparison.
   It covers 10/20 branches, but it will require writing 10 additional unique tests just to cover it.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r505546862



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  PartitionsSpec getPartitionsSpec();
+
+  IndexSpec getIndexSpec();
+
+  IndexSpec getIndexSpecForIntermediatePersists();

Review comment:
       Hi @jihoonson. Can you chime in to resolve this issue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501517535



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -32,11 +35,43 @@
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The inceremental index implementation to use

Review comment:
       Fixed

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
##########
@@ -1262,6 +1267,7 @@ private IndexTuningConfig(
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Fixed

##########
File path: server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
##########
@@ -132,6 +136,7 @@ public RealtimeTuningConfig(
       @JsonProperty("dedupColumn") @Nullable String dedupColumn
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] liran-funaro commented on a change in pull request #10335: Configurable Index Type

Posted by GitBox <gi...@apache.org>.
liran-funaro commented on a change in pull request #10335:
URL: https://github.com/apache/druid/pull/10335#discussion_r501523561



##########
File path: processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.jackson;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+
+public class AppendableIndexModule extends SimpleModule

Review comment:
       Added. See `AppendableIndexSpecTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org