You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2022/10/22 00:25:44 UTC

[druid] branch master updated: Remove basePersistDirectory from tuning configs. (#13040)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d98c808d3f Remove basePersistDirectory from tuning configs. (#13040)
d98c808d3f is described below

commit d98c808d3f087645d8dcfd516e77b0058bdfd389
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Fri Oct 21 17:25:36 2022 -0700

    Remove basePersistDirectory from tuning configs. (#13040)
    
    * Remove basePersistDirectory from tuning configs.
    
    Since the removal of CliRealtime, it serves no purpose, since it is
    always overridden in production using withBasePersistDirectory given
    some subdirectory of the task work directory.
    
    Removing this from the tuning config has a benefit beyond removing
    no-longer-needed logic: it also avoids the side effect of empty
    "druid-realtime-persist" directories getting created in the systemwide
    temp directory.
    
    * Test adjustments to appropriately set basePersistDirectory.
    
    * Remove unused import.
    
    * Fix RATC constructor.
---
 .../indexing/kafka/KafkaIndexTaskTuningConfig.java |  55 ++++++-
 .../supervisor/KafkaSupervisorTuningConfig.java    |   8 +-
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |  12 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |   6 -
 .../KafkaSupervisorTuningConfigTest.java           |   6 +-
 .../TestModifiedKafkaIndexTaskTuningConfig.java    |   4 +-
 .../kinesis/KinesisIndexTaskTuningConfig.java      |  93 +++++++++---
 .../supervisor/KinesisSupervisorTuningConfig.java  |   8 +-
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |  11 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   4 -
 .../KinesisSupervisorTuningConfigTest.java         |   6 +-
 .../TestModifiedKinesisIndexTaskTuningConfig.java  |   4 +-
 .../index/RealtimeAppenderatorTuningConfig.java    |  95 ++++++++----
 .../SeekableStreamIndexTaskTuningConfig.java       |   3 +-
 .../segment/indexing/RealtimeTuningConfig.java     | 105 ++++++++-----
 .../segment/indexing/RealtimeTuningConfigTest.java |  13 +-
 .../appenderator/AppenderatorPlumberTest.java      |  24 ++-
 .../appenderator/StreamAppenderatorDriverTest.java |  13 +-
 .../appenderator/StreamAppenderatorTest.java       | 165 +++++++++++++++------
 .../appenderator/StreamAppenderatorTester.java     | 115 ++++++++------
 .../plumber/RealtimePlumberSchoolTest.java         |   7 +-
 21 files changed, 500 insertions(+), 257 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index c0dfb334e8..be3aac9ac2 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -32,8 +32,54 @@ import java.io.File;
 
 public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
 {
-  @JsonCreator
   public KafkaIndexTaskTuningConfig(
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      @Nullable Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      @Nullable Period intermediatePersistPeriod,
+      @Nullable File basePersistDirectory,
+      @Nullable Integer maxPendingPersists,
+      @Nullable IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      @Nullable Boolean reportParseExceptions,
+      @Nullable Long handoffConditionTimeout,
+      @Nullable Boolean resetOffsetAutomatically,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Period intermediateHandoffPeriod,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions
+  )
+  {
+    super(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        false,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+  }
+
+  @JsonCreator
+  private KafkaIndexTaskTuningConfig(
       @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@@ -41,7 +87,6 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -55,7 +100,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
-    super(
+    this(
         appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
@@ -63,14 +108,13 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
         reportParseExceptions,
         handoffConditionTimeout,
         resetOffsetAutomatically,
-        false,
         segmentWriteOutMediumFactory,
         intermediateHandoffPeriod,
         logParseExceptions,
@@ -116,7 +160,6 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
            ", maxBytesInMemory=" + getMaxBytesInMemory() +
            ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 4fc5489a8d..596e532924 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -29,7 +29,6 @@ import org.joda.time.Duration;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
     implements SeekableStreamSupervisorTuningConfig
@@ -67,7 +66,6 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
         null,
         null,
         null,
-        null,
         null
     );
   }
@@ -80,7 +78,6 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -108,7 +105,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
@@ -198,7 +195,6 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
            ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
            ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", reportParseExceptions=" + isReportParseExceptions() +
@@ -229,7 +225,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
         getMaxRowsPerSegment(),
         getMaxTotalRows(),
         getIntermediatePersistPeriod(),
-        getBasePersistDirectory(),
+        null,
         getMaxPendingPersists(),
         getIndexSpec(),
         getIndexSpecForIntermediatePersists(),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 6674c36f17..2c5bad93cb 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -61,7 +61,7 @@ public class KafkaIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -102,7 +102,7 @@ public class KafkaIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
@@ -127,7 +127,6 @@ public class KafkaIndexTaskTuningConfigTest
         2,
         10L,
         new Period("PT3S"),
-        new File("/tmp/xxx"),
         4,
         new IndexSpec(),
         new IndexSpec(),
@@ -146,7 +145,7 @@ public class KafkaIndexTaskTuningConfigTest
         null,
         null
     );
-    KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
+    KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
 
     Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
     Assert.assertEquals(1, copy.getMaxRowsInMemory());
@@ -154,7 +153,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertNotEquals(null, copy.getMaxTotalRows());
     Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
     Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
-    Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
+    Assert.assertNull(copy.getBasePersistDirectory());
     Assert.assertEquals(4, copy.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
     Assert.assertEquals(true, copy.isReportParseExceptions());
@@ -197,7 +196,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
     Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
     Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
-    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertNull(deserialized.getBasePersistDirectory());
     Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
     Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
     Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
@@ -221,7 +220,6 @@ public class KafkaIndexTaskTuningConfigTest
         2,
         10L,
         new Period("PT3S"),
-        new File("/tmp/xxx"),
         4,
         new IndexSpec(),
         new IndexSpec(),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c30795cdd1..78b010b3b3 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -115,7 +115,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -313,7 +312,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
             50000,
             null,
             new Period("P1Y"),
-            new File("/test"),
             null,
             null,
             null,
@@ -3771,7 +3769,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
             50000,
             null,
             new Period("P1Y"),
-            new File("/test"),
             null,
             null,
             null,
@@ -3811,7 +3808,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -4138,7 +4134,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -4252,7 +4247,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 7151eb0681..5ac97b12c8 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -32,8 +32,6 @@ import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-
 public class KafkaSupervisorTuningConfigTest
 {
   private final ObjectMapper mapper;
@@ -59,7 +57,7 @@ public class KafkaSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -110,7 +108,7 @@ public class KafkaSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 642141c3c4..fd6cbb48b8 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -29,7 +29,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 @JsonTypeName("KafkaTuningConfig")
 public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuningConfig
@@ -45,7 +44,6 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -68,7 +66,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 21c0510758..4e9dbafb7c 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -47,8 +47,72 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
   private final Integer fetchThreads;
   private final int maxRecordsPerPoll;
 
-  @JsonCreator
   public KinesisIndexTaskTuningConfig(
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      Integer maxRowsInMemory,
+      Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      Integer maxRowsPerSegment,
+      Long maxTotalRows,
+      Period intermediatePersistPeriod,
+      File basePersistDirectory,
+      Integer maxPendingPersists,
+      IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      Boolean reportParseExceptions,
+      Long handoffConditionTimeout,
+      Boolean resetOffsetAutomatically,
+      Boolean skipSequenceNumberAvailabilityCheck,
+      Integer recordBufferSize,
+      Integer recordBufferOfferTimeout,
+      Integer recordBufferFullWait,
+      Integer fetchThreads,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions,
+      @Nullable Integer maxRecordsPerPoll,
+      @Nullable Period intermediateHandoffPeriod
+  )
+  {
+    super(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        skipSequenceNumberAvailabilityCheck,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+    this.recordBufferSize = recordBufferSize == null ? DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
+    this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
+                                    ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
+                                    : recordBufferOfferTimeout;
+    this.recordBufferFullWait = recordBufferFullWait == null ? DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
+    this.fetchThreads = fetchThreads; // we handle this being null later
+    this.maxRecordsPerPoll = maxRecordsPerPoll == null ? DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
+
+    Preconditions.checkArgument(
+        !(super.isResetOffsetAutomatically() && super.isSkipSequenceNumberAvailabilityCheck()),
+        "resetOffsetAutomatically cannot be used if skipSequenceNumberAvailabilityCheck=true"
+    );
+  }
+
+  @JsonCreator
+  private KinesisIndexTaskTuningConfig(
       @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@@ -56,7 +120,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -76,7 +139,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
       @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod
   )
   {
-    super(
+    this(
         appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
@@ -84,7 +147,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
@@ -92,23 +155,16 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
         handoffConditionTimeout,
         resetOffsetAutomatically,
         skipSequenceNumberAvailabilityCheck,
+        recordBufferSize,
+        recordBufferOfferTimeout,
+        recordBufferFullWait,
+        fetchThreads,
         segmentWriteOutMediumFactory,
-        intermediateHandoffPeriod,
         logParseExceptions,
         maxParseExceptions,
-        maxSavedParseExceptions
-    );
-    this.recordBufferSize = recordBufferSize == null ? DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
-    this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
-                                    ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
-                                    : recordBufferOfferTimeout;
-    this.recordBufferFullWait = recordBufferFullWait == null ? DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
-    this.fetchThreads = fetchThreads; // we handle this being null later
-    this.maxRecordsPerPoll = maxRecordsPerPoll == null ? DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
-
-    Preconditions.checkArgument(
-        !(super.isResetOffsetAutomatically() && super.isSkipSequenceNumberAvailabilityCheck()),
-        "resetOffsetAutomatically cannot be used if skipSequenceNumberAvailabilityCheck=true"
+        maxSavedParseExceptions,
+        maxRecordsPerPoll,
+        intermediateHandoffPeriod
     );
   }
 
@@ -217,7 +273,6 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", reportParseExceptions=" + isReportParseExceptions() +
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 57bae9f2a8..ba8f4311f0 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -29,7 +29,6 @@ import org.joda.time.Duration;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
     implements SeekableStreamSupervisorTuningConfig
@@ -79,7 +78,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
         null,
         null,
         null,
-        null,
         null
     );
   }
@@ -92,7 +90,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -129,7 +126,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
@@ -239,7 +236,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", reportParseExceptions=" + isReportParseExceptions() +
@@ -278,7 +274,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
         getMaxRowsPerSegment(),
         getMaxTotalRows(),
         getIntermediatePersistPeriod(),
-        getBasePersistDirectory(),
+        null,
         getMaxPendingPersists(),
         getIndexSpec(),
         getIndexSpecForIntermediatePersists(),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 9aa1f6127b..61db57449d 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -67,7 +67,7 @@ public class KinesisIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -115,7 +115,7 @@ public class KinesisIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
@@ -173,7 +173,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
     Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
     Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
-    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertNull(deserialized.getBasePersistDirectory());
     Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
     Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
     Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
@@ -231,7 +231,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
     Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
     Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
-    Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+    Assert.assertNull(deserialized.getBasePersistDirectory());
     Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
     Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
     Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
@@ -286,7 +286,6 @@ public class KinesisIndexTaskTuningConfigTest
         2,
         100L,
         new Period("PT3S"),
-        new File("/tmp/xxx"),
         4,
         new IndexSpec(),
         new IndexSpec(),
@@ -322,7 +321,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(100L, (long) copy.getMaxTotalRows());
     Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
-    Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
+    Assert.assertNull(copy.getBasePersistDirectory());
     Assert.assertEquals(4, copy.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
     Assert.assertTrue(copy.isReportParseExceptions());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 99d8d5223b..be836567eb 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -103,7 +103,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -185,7 +184,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -3948,7 +3946,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -5061,7 +5058,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index c043cbdea5..a5c48b35b6 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -31,8 +31,6 @@ import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-
 public class KinesisSupervisorTuningConfigTest
 {
   private final ObjectMapper mapper;
@@ -58,7 +56,7 @@ public class KinesisSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -106,7 +104,7 @@ public class KinesisSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index c6ce08ab2a..2772cc5794 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -29,7 +29,6 @@ import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 @JsonTypeName("KinesisTuningConfig")
 public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTuningConfig
@@ -45,7 +44,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -74,7 +72,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 172d8eb0f1..8b11eddb7c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
@@ -48,11 +47,6 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
   private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
   private static final long DEFAULT_ALERT_TIMEOUT = 0;
 
-  private static File createNewBasePersistDirectory()
-  {
-    return FileUtils.createTempDir("druid-realtime-persist");
-  }
-
   private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
@@ -74,27 +68,26 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
   private final int maxParseExceptions;
   private final int maxSavedParseExceptions;
 
-  @JsonCreator
   public RealtimeAppenderatorTuningConfig(
-      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
-      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
-      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
-      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
-      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
-      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
-      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
-      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
-      @JsonProperty("shardSpec") ShardSpec shardSpec,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
-      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
-      @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
-      @JsonProperty("alertTimeout") Long alertTimeout,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
-      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
-      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      Period intermediatePersistPeriod,
+      File basePersistDirectory,
+      Integer maxPendingPersists,
+      ShardSpec shardSpec,
+      IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      Boolean reportParseExceptions,
+      Long publishAndHandoffTimeout,
+      Long alertTimeout,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -108,7 +101,7 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
                                      : intermediatePersistPeriod;
-    this.basePersistDirectory = basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory;
+    this.basePersistDirectory = basePersistDirectory;
     this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
     this.shardSpec = shardSpec == null ? DEFAULT_SHARD_SPEC : shardSpec;
     this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@@ -142,6 +135,51 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
                               : logParseExceptions;
   }
 
+  @JsonCreator
+  private RealtimeAppenderatorTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("shardSpec") ShardSpec shardSpec,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
+      @JsonProperty("alertTimeout") Long alertTimeout,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+      @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+  )
+  {
+    this(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        null,
+        maxPendingPersists,
+        shardSpec,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        publishAndHandoffTimeout,
+        alertTimeout,
+        segmentWriteOutMediumFactory,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+  }
+
   @Override
   @JsonProperty
   public AppendableIndexSpec getAppendableIndexSpec()
@@ -199,10 +237,9 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
   }
 
   @Override
-  @JsonProperty
   public File getBasePersistDirectory()
   {
-    return basePersistDirectory;
+    return Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory not set");
   }
 
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 2d3445f3a8..904f2820af 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -98,7 +98,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? defaults.getIntermediatePersistPeriod()
                                      : intermediatePersistPeriod;
-    this.basePersistDirectory = defaults.getBasePersistDirectory();
+    this.basePersistDirectory = basePersistDirectory;
     this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists;
     this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
     this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
@@ -193,7 +193,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
   }
 
   @Override
-  @JsonProperty
   public File getBasePersistDirectory()
   {
     return basePersistDirectory;
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 6901fafd79..84061711b3 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -56,11 +55,6 @@ public class RealtimeTuningConfig implements AppenderatorConfig
   private static final long DEFAULT_ALERT_TIMEOUT = 0;
   private static final String DEFAULT_DEDUP_COLUMN = null;
 
-  private static File createNewBasePersistDirectory()
-  {
-    return FileUtils.createTempDir("druid-realtime-persist");
-  }
-
   // Might make sense for this to be a builder
   public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory)
   {
@@ -71,7 +65,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
         DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
         DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
         DEFAULT_WINDOW_PERIOD,
-        basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory,
+        basePersistDirectory,
         DEFAULT_VERSIONING_POLICY,
         DEFAULT_REJECTION_POLICY_FACTORY,
         DEFAULT_MAX_PENDING_PERSISTS,
@@ -111,28 +105,27 @@ public class RealtimeTuningConfig implements AppenderatorConfig
   @Nullable
   private final String dedupColumn;
 
-  @JsonCreator
   public RealtimeTuningConfig(
-      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
-      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
-      @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
-      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
-      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
-      @JsonProperty("windowPeriod") Period windowPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
-      @JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
-      @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
-      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
-      @JsonProperty("shardSpec") ShardSpec shardSpec,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
-      @JsonProperty("persistThreadPriority") int persistThreadPriority,
-      @JsonProperty("mergeThreadPriority") int mergeThreadPriority,
-      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
-      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
-      @JsonProperty("alertTimeout") Long alertTimeout,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("dedupColumn") @Nullable String dedupColumn
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      Integer maxRowsInMemory,
+      Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      Period intermediatePersistPeriod,
+      Period windowPeriod,
+      File basePersistDirectory,
+      VersioningPolicy versioningPolicy,
+      RejectionPolicyFactory rejectionPolicyFactory,
+      Integer maxPendingPersists,
+      ShardSpec shardSpec,
+      IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      int persistThreadPriority,
+      int mergeThreadPriority,
+      Boolean reportParseExceptions,
+      Long handoffConditionTimeout,
+      Long alertTimeout,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable String dedupColumn
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -146,8 +139,8 @@ public class RealtimeTuningConfig implements AppenderatorConfig
                                      ? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
                                      : intermediatePersistPeriod;
     this.windowPeriod = windowPeriod == null ? DEFAULT_WINDOW_PERIOD : windowPeriod;
-    this.basePersistDirectory = basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory;
-    this.versioningPolicy = versioningPolicy == null ? DEFAULT_VERSIONING_POLICY : versioningPolicy;
+    this.basePersistDirectory = basePersistDirectory;
+    this.versioningPolicy = versioningPolicy;
     this.rejectionPolicyFactory = rejectionPolicyFactory == null
                                   ? DEFAULT_REJECTION_POLICY_FACTORY
                                   : rejectionPolicyFactory;
@@ -172,6 +165,52 @@ public class RealtimeTuningConfig implements AppenderatorConfig
     this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn;
   }
 
+  @JsonCreator
+  private RealtimeTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+      @JsonProperty("windowPeriod") Period windowPeriod,
+      @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("shardSpec") ShardSpec shardSpec,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+      @JsonProperty("persistThreadPriority") int persistThreadPriority,
+      @JsonProperty("mergeThreadPriority") int mergeThreadPriority,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
+      @JsonProperty("alertTimeout") Long alertTimeout,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("dedupColumn") @Nullable String dedupColumn
+  )
+  {
+    this(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        intermediatePersistPeriod,
+        windowPeriod,
+        null,
+        null,
+        rejectionPolicyFactory,
+        maxPendingPersists,
+        shardSpec,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        persistThreadPriority,
+        mergeThreadPriority,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        alertTimeout,
+        segmentWriteOutMediumFactory,
+        dedupColumn
+    );
+  }
+
   @Override
   @JsonProperty
   public AppendableIndexSpec getAppendableIndexSpec()
@@ -214,16 +253,14 @@ public class RealtimeTuningConfig implements AppenderatorConfig
   }
 
   @Override
-  @JsonProperty
   public File getBasePersistDirectory()
   {
-    return basePersistDirectory;
+    return Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory not set");
   }
 
-  @JsonProperty
   public VersioningPolicy getVersioningPolicy()
   {
-    return versioningPolicy;
+    return Preconditions.checkNotNull(versioningPolicy, "versioningPolicy not set");
   }
 
   @JsonProperty("rejectionPolicy")
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index 72f0b8bbfe..c240cc4d35 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -35,14 +35,6 @@ import java.util.UUID;
 
 public class RealtimeTuningConfigTest
 {
-  @Test
-  public void testDefaultBasePersistDirectory()
-  {
-    final RealtimeTuningConfig tuningConfig1 = RealtimeTuningConfig.makeDefaultTuningConfig(null);
-    final RealtimeTuningConfig tuningConfig2 = RealtimeTuningConfig.makeDefaultTuningConfig(null);
-    Assert.assertNotEquals(tuningConfig1.getBasePersistDirectory(), tuningConfig2.getBasePersistDirectory());
-  }
-
   @Test
   public void testErrorMessageIsMeaningfulWhenUnableToCreateTemporaryDirectory()
   {
@@ -89,7 +81,6 @@ public class RealtimeTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(0, config.getHandoffConditionTimeout());
     Assert.assertEquals(0, config.getAlertTimeout());
@@ -102,7 +93,7 @@ public class RealtimeTuningConfigTest
     Assert.assertEquals(0, config.getMergeThreadPriority());
     Assert.assertEquals(0, config.getPersistThreadPriority());
     Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());
-    Assert.assertEquals(false, config.isReportParseExceptions());
+    Assert.assertFalse(config.isReportParseExceptions());
   }
 
   @Test
@@ -113,7 +104,6 @@ public class RealtimeTuningConfigTest
                      + "  \"maxRowsInMemory\": 100,\n"
                      + "  \"intermediatePersistPeriod\": \"PT1H\",\n"
                      + "  \"windowPeriod\": \"PT1H\",\n"
-                     + "  \"basePersistDirectory\": \"/tmp/xxx\",\n"
                      + "  \"maxPendingPersists\": 100,\n"
                      + "  \"persistThreadPriority\": 100,\n"
                      + "  \"mergeThreadPriority\": 100,\n"
@@ -136,7 +126,6 @@ public class RealtimeTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getHandoffConditionTimeout());
     Assert.assertEquals(70, config.getAlertTimeout());
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index f2d793e051..dc96e912dc 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -29,16 +29,27 @@ import org.apache.druid.segment.realtime.plumber.NoopRejectionPolicyFactory;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class AppenderatorPlumberTest
 {
-  private final AppenderatorPlumber plumber;
-  private final StreamAppenderatorTester streamAppenderatorTester;
+  private AppenderatorPlumber plumber;
+  private StreamAppenderatorTester streamAppenderatorTester;
 
-  public AppenderatorPlumberTest() throws Exception
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception
   {
-    this.streamAppenderatorTester = new StreamAppenderatorTester(10);
+    this.streamAppenderatorTester =
+        new StreamAppenderatorTester.Builder()
+            .maxRowsInMemory(10)
+            .basePersistDirectory(temporaryFolder.newFolder())
+            .build();
     DataSegmentAnnouncer segmentAnnouncer = EasyMock
         .createMock(DataSegmentAnnouncer.class);
     segmentAnnouncer.announceSegment(EasyMock.anyObject());
@@ -60,7 +71,7 @@ public class AppenderatorPlumberTest
                 EasyMock.anyObject(),
                 EasyMock.anyObject(),
                 EasyMock.anyObject())).andReturn(true).anyTimes();
-    
+
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
         null,
         1,
@@ -68,7 +79,7 @@ public class AppenderatorPlumberTest
         null,
         null,
         null,
-        null,
+        temporaryFolder.newFolder(),
         new IntervalStartVersioningPolicy(),
         new NoopRejectionPolicyFactory(),
         null,
@@ -88,7 +99,6 @@ public class AppenderatorPlumberTest
                                            tuningConfig, streamAppenderatorTester.getMetrics(),
                                            segmentAnnouncer, segmentPublisher, handoffNotifier,
                                            streamAppenderatorTester.getAppenderator());
-
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index b6154a6afe..476ecded7e 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -50,7 +50,9 @@ import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -100,14 +102,21 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
   private StreamAppenderatorDriver driver;
   private DataSegmentKiller dataSegmentKiller;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   static {
     NullHandling.initializeForTests();
   }
 
   @Before
-  public void setUp()
+  public void setUp() throws Exception
   {
-    streamAppenderatorTester = new StreamAppenderatorTester(MAX_ROWS_IN_MEMORY);
+    streamAppenderatorTester =
+        new StreamAppenderatorTester.Builder()
+            .maxRowsInMemory(MAX_ROWS_IN_MEMORY)
+            .basePersistDirectory(temporaryFolder.newFolder())
+            .build();
     allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
     segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
     dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index bc123b1896..2e05cb9053 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -51,7 +51,9 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -69,10 +71,17 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
       si("2001/2002", "A", 0)
   );
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   @Test
   public void testSimpleIngestion() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
+    try (final StreamAppenderatorTester tester =
+             new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                   .enablePushFailure(true)
+                                                   .basePersistDirectory(temporaryFolder.newFolder())
+                                                   .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       boolean thrown;
 
@@ -128,7 +137,10 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
           committerSupplier.get(),
           false
       ).get();
-      Assert.assertEquals(ImmutableMap.of("x", "3"), (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata());
+      Assert.assertEquals(
+          ImmutableMap.of("x", "3"),
+          (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
+      );
       Assert.assertEquals(
           IDENTIFIERS.subList(0, 2),
           sorted(
@@ -157,15 +169,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
   {
     try (
-        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
-            100,
-            1024,
-            null,
-            true,
-            new SimpleRowIngestionMeters(),
-            true
-        )
-    ) {
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(1024)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .skipBytesInMemoryOverheadCheck(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -209,15 +219,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
   {
     try (
-        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
-            100,
-            1024,
-            null,
-            true,
-            new SimpleRowIngestionMeters(),
-            true
-        )
-    ) {
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(1024)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .skipBytesInMemoryOverheadCheck(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -257,7 +265,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testMaxBytesInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 15000, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(15000)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -363,7 +377,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test(expected = RuntimeException.class)
   public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 5180, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(5180)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -394,15 +414,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
   {
     try (
-        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
-            100,
-            10,
-            null,
-            true,
-            new SimpleRowIngestionMeters(),
-            true
-        )
-    ) {
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(10)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .skipBytesInMemoryOverheadCheck(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -443,7 +461,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 10000, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(10000)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -488,7 +512,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testMaxBytesInMemoryInMultipleSinks() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, 31100, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(31100)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -628,7 +658,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testIgnoreMaxBytesInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(100, -1, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(-1)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -676,7 +712,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testMaxRowsInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = new Supplier<Committer>()
@@ -727,7 +768,11 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, false)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -774,7 +819,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   public void testRestoreFromDisk() throws Exception
   {
     final RealtimeTuningConfig tuningConfig;
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       tuningConfig = tester.getTuningConfig();
 
@@ -816,12 +866,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), committerSupplier);
       appenderator.close();
 
-      try (final StreamAppenderatorTester tester2 = new StreamAppenderatorTester(
-          2,
-          -1,
-          tuningConfig.getBasePersistDirectory(),
-          true
-      )) {
+      try (
+          final StreamAppenderatorTester tester2 =
+              new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                    .basePersistDirectory(tuningConfig.getBasePersistDirectory())
+                                                    .enablePushFailure(true)
+                                                    .build()) {
         final Appenderator appenderator2 = tester2.getAppenderator();
         Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob());
         Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments());
@@ -833,7 +883,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test(timeout = 60_000L)
   public void testTotalRowCount() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(3, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
       final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
@@ -876,7 +931,13 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   public void testVerifyRowIngestionMetrics() throws Exception
   {
     final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(5)
+                                                  .maxSizeInBytes(10000L)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .rowIngestionMeters(rowIngestionMeters)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       appenderator.startJob();
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
@@ -892,7 +953,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testQueryByIntervals() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
 
       appenderator.startJob();
@@ -1028,7 +1094,12 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
   @Test
   public void testQueryBySegments() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new StreamAppenderatorTester(2, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                  .basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
 
       appenderator.startJob();
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 808d217f4e..413f315f50 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.realtime.appenderator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
@@ -87,57 +88,10 @@ public class StreamAppenderatorTester implements AutoCloseable
   private final ObjectMapper objectMapper;
   private final Appenderator appenderator;
   private final ExecutorService queryExecutor;
-  private final IndexIO indexIO;
-  private final IndexMergerV9 indexMerger;
   private final ServiceEmitter emitter;
 
   private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
 
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory
-  )
-  {
-    this(maxRowsInMemory, -1, null, false);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final boolean enablePushFailure
-  )
-  {
-    this(maxRowsInMemory, -1, null, enablePushFailure);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final boolean enablePushFailure
-  )
-  {
-    this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final File basePersistDirectory,
-      final boolean enablePushFailure
-  )
-  {
-    this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), false);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final File basePersistDirectory,
-      final boolean enablePushFailure,
-      final RowIngestionMeters rowIngestionMeters
-  )
-  {
-    this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, false);
-  }
-
   public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final long maxSizeInBytes,
@@ -199,7 +153,7 @@ public class StreamAppenderatorTester implements AutoCloseable
     metrics = new FireDepartmentMetrics();
     queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
 
-    indexIO = new IndexIO(
+    IndexIO indexIO = new IndexIO(
         objectMapper,
         new ColumnConfig()
         {
@@ -210,7 +164,12 @@ public class StreamAppenderatorTester implements AutoCloseable
           }
         }
     );
-    indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
+
+    IndexMergerV9 indexMerger = new IndexMergerV9(
+        objectMapper,
+        indexIO,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance()
+    );
 
     emitter = new ServiceEmitter(
         "test",
@@ -342,4 +301,62 @@ public class StreamAppenderatorTester implements AutoCloseable
     emitter.close();
     FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
   }
+
+  public static class Builder
+  {
+    private int maxRowsInMemory;
+    private long maxSizeInBytes = -1;
+    private File basePersistDirectory;
+    private boolean enablePushFailure;
+    private RowIngestionMeters rowIngestionMeters;
+    private boolean skipBytesInMemoryOverheadCheck;
+
+    public Builder maxRowsInMemory(final int maxRowsInMemory)
+    {
+      this.maxRowsInMemory = maxRowsInMemory;
+      return this;
+    }
+
+    public Builder maxSizeInBytes(final long maxSizeInBytes)
+    {
+      this.maxSizeInBytes = maxSizeInBytes;
+      return this;
+    }
+
+    public Builder basePersistDirectory(final File basePersistDirectory)
+    {
+      this.basePersistDirectory = basePersistDirectory;
+      return this;
+    }
+
+    public Builder enablePushFailure(final boolean enablePushFailure)
+    {
+      this.enablePushFailure = enablePushFailure;
+      return this;
+    }
+
+    public Builder rowIngestionMeters(final RowIngestionMeters rowIngestionMeters)
+    {
+      this.rowIngestionMeters = rowIngestionMeters;
+      return this;
+    }
+
+    public Builder skipBytesInMemoryOverheadCheck(final boolean skipBytesInMemoryOverheadCheck)
+    {
+      this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
+      return this;
+    }
+
+    public StreamAppenderatorTester build()
+    {
+      return new StreamAppenderatorTester(
+          maxRowsInMemory,
+          maxSizeInBytes,
+          Preconditions.checkNotNull(basePersistDirectory, "basePersistDirectory"),
+          enablePushFailure,
+          rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters,
+          skipBytesInMemoryOverheadCheck
+      );
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index dcf697a821..eeea2a930e 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -68,7 +68,9 @@ import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -119,6 +121,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
   private FireDepartmentMetrics metrics;
   private File tmpDir;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   public RealtimePlumberSchoolTest(
       RejectionPolicyFactory rejectionPolicy,
       SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
@@ -207,7 +212,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
         null,
         null,
         null,
-        null,
+        temporaryFolder.newFolder(),
         new IntervalStartVersioningPolicy(),
         rejectionPolicy,
         null,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org