You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/05/11 21:34:57 UTC

[druid] branch master updated: Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion (#11123)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e5048e  Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion (#11123)
8e5048e is described below

commit 8e5048e643f95c25cc95be91317475fecbdce456
Author: Agustin Gonzalez <ag...@imply.io>
AuthorDate: Tue May 11 14:34:26 2021 -0700

    Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion (#11123)
    
    * Avoid mapping hydrants in create segments phase for native ingestion
    
    * Drop queriable indices after a given sink is fully merged
    
    * Do not drop memory mappings for realtime ingestion
    
    * Style fixes
    
    * Renamed to match use case better
    
    * Rollback memoization code and use the real time flag instead
    
    * Null ptr fix in FireHydrant toString plus adjustments to memory pressure tracking calculations
    
    * Style
    
    * Log some count stats
    
    * Make sure sinks size is obtained at the right time
    
    * BatchAppenderator unit test
    
    * Fix comment typos
    
    * Renamed methods to make them more readable
    
    * Move persisted metadata from FireHydrant class to AppenderatorImpl. Removed superfluous differences and fix comment typo. Removed custom comparator
    
    * Missing dependency
    
    * Make persisted hydrant metadata map concurrent and better reflect the fact that keys are Java references. Maintain persisted metadata when dropping/closing segments.
    
    * Replaced concurrent variables with normal ones
    
    * Added   batchMemoryMappedIndex "fallback" flag with default "false". Set this to "true" make code fallback to previous code path.
    
    * Style fix.
    
    * Added note to new setting in doc, using Iterables.size (and removing a dependency), and fixing a typo in a comment.
    
    * Forgot to commit this edited documentation message
---
 docs/configuration/index.md                        |   1 +
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   1 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   1 +
 .../druid/indexing/common/config/TaskConfig.java   |  14 +-
 .../indexing/common/task/BatchAppenderators.java   |   3 +-
 .../appenderator/BatchAppenderatorTest.java        | 233 ++++++++++++++++
 .../appenderator/BatchAppenderatorTester.java      | 293 +++++++++++++++++++++
 .../druid/indexing/common/TaskToolboxTest.java     |   1 +
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   1 +
 .../common/task/CompactionTaskRunTest.java         |   2 +-
 .../indexing/common/task/CompactionTaskTest.java   |   2 +-
 .../druid/indexing/common/task/HadoopTaskTest.java |   1 +
 .../indexing/common/task/IngestionTestBase.java    |   2 +-
 .../common/task/RealtimeIndexTaskTest.java         |   1 +
 .../common/task/TestAppenderatorsManager.java      |   6 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   |   5 +-
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   1 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   2 +-
 .../indexing/worker/WorkerTaskManagerTest.java     |   1 +
 .../indexing/worker/WorkerTaskMonitorTest.java     |   1 +
 .../IntermediaryDataManagerAutoCleanupTest.java    |   1 +
 ...ermediaryDataManagerManualAddAndDeleteTest.java |   1 +
 .../shuffle/ShuffleDataSegmentPusherTest.java      |   1 +
 .../worker/shuffle/ShuffleResourceTest.java        |   1 +
 .../apache/druid/segment/realtime/FireHydrant.java |   2 +-
 .../realtime/appenderator/Appenderator.java        |  12 +-
 .../realtime/appenderator/AppenderatorImpl.java    | 133 +++++++++-
 .../realtime/appenderator/Appenderators.java       |   9 +-
 .../appenderator/AppenderatorsManager.java         |   3 +-
 .../DefaultOfflineAppenderatorFactory.java         |   3 +-
 .../DummyForInjectionAppenderatorsManager.java     |   3 +-
 .../appenderator/PeonAppenderatorsManager.java     |   6 +-
 .../UnifiedIndexerAppenderatorsManager.java        |   9 +-
 .../druid/segment/realtime/FireHydrantTest.java    |   7 +
 .../StreamAppenderatorDriverFailTest.java          |   6 +
 .../UnifiedIndexerAppenderatorsManagerTest.java    |   3 +-
 36 files changed, 737 insertions(+), 35 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index fdb9365..35d6ad6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1332,6 +1332,7 @@ Additional peon configs include:
 |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
 |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
 |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
+|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
 |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
 |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
 |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1288104..34ef75c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2782,6 +2782,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
         null,
         null,
         null,
+        false,
         false
     );
     final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 6b49759..19d2445 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2868,6 +2868,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
         null,
         null,
         null,
+        false,
         false
     );
     final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index f0999d5..0285b33 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -76,6 +76,9 @@ public class TaskConfig
   @JsonProperty
   private final boolean ignoreTimestampSpecForDruidInputSource;
 
+  @JsonProperty
+  private final boolean batchMemoryMappedIndex;
+
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
@@ -87,7 +90,8 @@ public class TaskConfig
       @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
       @JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
       @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
-      @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
+      @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
+      @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior
   )
   {
     this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -113,6 +117,7 @@ public class TaskConfig
       this.shuffleDataLocations = shuffleDataLocations;
     }
     this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
+    this.batchMemoryMappedIndex = batchMemoryMapIndex;
   }
 
   @JsonProperty
@@ -195,6 +200,13 @@ public class TaskConfig
     return ignoreTimestampSpecForDruidInputSource;
   }
 
+  @JsonProperty
+  public boolean getBatchMemoryMappedIndex()
+  {
+    return batchMemoryMappedIndex;
+  }
+
+
   private String defaultDir(@Nullable String configParameter, final String defaultVal)
   {
     if (configParameter == null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index 711f9d4..bff138f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -80,7 +80,8 @@ public final class BatchAppenderators
         toolbox.getIndexIO(),
         toolbox.getIndexMergerV9(),
         rowIngestionMeters,
-        parseExceptionHandler
+        parseExceptionHandler,
+        toolbox.getConfig().getBatchMemoryMappedIndex()
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
new file mode 100644
index 0000000..e441763
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.appenderator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BatchAppenderatorTest extends InitializedNullHandlingTest
+{
+  private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
+      createSegmentId("2000/2001", "A", 0),
+      createSegmentId("2000/2001", "A", 1),
+      createSegmentId("2001/2002", "A", 0)
+  );
+
+  @Test
+  public void testSimpleIngestionWithIndexesNotMapped() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
+                                                                            false,
+                                                                            false)) {
+      final Appenderator appenderator = tester.getAppenderator();
+      boolean thrown;
+
+      // startJob
+      Assert.assertEquals(null, appenderator.startJob());
+
+      // getDataSource
+      Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+      // add
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      Assert.assertEquals(
+          2,
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
+                      .getNumRowsInSegment()
+      );
+
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
+                      .getNumRowsInSegment()
+      );
+
+      // getSegments
+      Assert.assertEquals(IDENTIFIERS.subList(0, 2),
+                          appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
+
+      // getRowCount
+      Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
+      Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
+      thrown = false;
+      try {
+        appenderator.getRowCount(IDENTIFIERS.get(2));
+      }
+      catch (IllegalStateException e) {
+        thrown = true;
+      }
+      Assert.assertTrue(thrown);
+
+      // push all
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          appenderator.getSegments(),
+          null,
+          false
+      ).get();
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 2),
+              Lists.transform(
+                  segmentsAndCommitMetadata.getSegments(),
+                  new Function<DataSegment, SegmentIdWithShardSpec>()
+                  {
+                    @Override
+                    public SegmentIdWithShardSpec apply(DataSegment input)
+                    {
+                      return SegmentIdWithShardSpec.fromDataSegment(input);
+                    }
+                  }
+              ).stream().sorted().collect(Collectors.toList())
+      );
+      Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+                          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
+
+      appenderator.clear();
+      Assert.assertTrue(appenderator.getSegments().isEmpty());
+    }
+  }
+
+  @Test
+  public void testSimpleIngestionWithIndexesMapped() throws Exception
+  {
+    try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
+                                                                            false,
+                                                                            true)) {
+      final Appenderator appenderator = tester.getAppenderator();
+      boolean thrown;
+
+      // startJob
+      Assert.assertEquals(null, appenderator.startJob());
+
+      // getDataSource
+      Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+      // add
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
+                      .getNumRowsInSegment()
+      );
+
+      Assert.assertEquals(
+          2,
+          appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
+                      .getNumRowsInSegment()
+      );
+
+      Assert.assertEquals(
+          1,
+          appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
+                      .getNumRowsInSegment()
+      );
+
+      // getSegments
+      Assert.assertEquals(IDENTIFIERS.subList(0, 2),
+                          appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
+
+      // getRowCount
+      Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
+      Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
+      thrown = false;
+      try {
+        appenderator.getRowCount(IDENTIFIERS.get(2));
+      }
+      catch (IllegalStateException e) {
+        thrown = true;
+      }
+      Assert.assertTrue(thrown);
+
+      // push all
+      final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+          appenderator.getSegments(),
+          null,
+          false
+      ).get();
+      Assert.assertEquals(
+          IDENTIFIERS.subList(0, 2),
+          Lists.transform(
+              segmentsAndCommitMetadata.getSegments(),
+              new Function<DataSegment, SegmentIdWithShardSpec>()
+              {
+                @Override
+                public SegmentIdWithShardSpec apply(DataSegment input)
+                {
+                  return SegmentIdWithShardSpec.fromDataSegment(input);
+                }
+              }
+          ).stream().sorted().collect(Collectors.toList())
+      );
+      Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+                          segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
+
+      appenderator.clear();
+      Assert.assertTrue(appenderator.getSegments().isEmpty());
+    }
+  }
+  private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
+  {
+    return new SegmentIdWithShardSpec(
+        AppenderatorTester.DATASOURCE,
+        Intervals.of(interval),
+        version,
+        new LinearShardSpec(partitionNum)
+
+    );
+  }
+
+  static InputRow createInputRow(String ts, String dim, Object met)
+  {
+    return new MapBasedInputRow(
+        DateTimes.of(ts).getMillis(),
+        ImmutableList.of("dim"),
+        ImmutableMap.of(
+            "dim",
+            dim,
+            "met",
+            met
+        )
+    );
+  }
+
+
+}
+
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
new file mode 100644
index 0000000..4058aff
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class BatchAppenderatorTester implements AutoCloseable
+{
+  public static final String DATASOURCE = "foo";
+
+  private final DataSchema schema;
+  private final IndexTask.IndexTuningConfig tuningConfig;
+  private final FireDepartmentMetrics metrics;
+  private final DataSegmentPusher dataSegmentPusher;
+  private final ObjectMapper objectMapper;
+  private final Appenderator appenderator;
+  private final IndexIO indexIO;
+  private final IndexMerger indexMerger;
+  private final ServiceEmitter emitter;
+
+  private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final boolean enablePushFailure,
+      boolean batchMemoryMappedIndex
+  )
+  {
+    this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex);
+  }
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      final File basePersistDirectory,
+      final boolean enablePushFailure,
+      boolean batchMemoryMappedIndex
+  )
+  {
+    this(
+        maxRowsInMemory,
+        maxSizeInBytes,
+        basePersistDirectory,
+        enablePushFailure,
+        new SimpleRowIngestionMeters(),
+        false,
+        batchMemoryMappedIndex
+    );
+  }
+
+  public BatchAppenderatorTester(
+      final int maxRowsInMemory,
+      final long maxSizeInBytes,
+      final File basePersistDirectory,
+      final boolean enablePushFailure,
+      final RowIngestionMeters rowIngestionMeters,
+      final boolean skipBytesInMemoryOverheadCheck,
+      boolean batchMemoryMappedIndex
+  )
+  {
+    objectMapper = new DefaultObjectMapper();
+    objectMapper.registerSubtypes(LinearShardSpec.class);
+
+    final Map<String, Object> parserMap = objectMapper.convertValue(
+        new MapInputRowParser(
+            new JSONParseSpec(
+                new TimestampSpec("ts", "auto", null),
+                new DimensionsSpec(null, null, null),
+                null,
+                null,
+                null
+            )
+        ),
+        Map.class
+    );
+    schema = new DataSchema(
+        DATASOURCE,
+        parserMap,
+        new AggregatorFactory[]{
+            new CountAggregatorFactory("count"),
+            new LongSumAggregatorFactory("met", "met")
+        },
+        new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+        null,
+        objectMapper
+    );
+    tuningConfig = new IndexTask.IndexTuningConfig(
+        null,
+        2,
+        null,
+        maxRowsInMemory,
+        maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
+        false,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+        true,
+        null,
+        null,
+        null,
+        null
+    ).withBasePersistDirectory(createNewBasePersistDirectory());
+
+    metrics = new FireDepartmentMetrics();
+
+    indexIO = new IndexIO(
+        objectMapper,
+        new ColumnConfig()
+        {
+          @Override
+          public int columnCacheSizeBytes()
+          {
+            return 0;
+          }
+        }
+    );
+    indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
+
+    emitter = new ServiceEmitter(
+        "test",
+        "test",
+        new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+    dataSegmentPusher = new DataSegmentPusher()
+    {
+      private boolean mustFail = true;
+
+      @Deprecated
+      @Override
+      public String getPathForHadoop(String dataSource)
+      {
+        return getPathForHadoop();
+      }
+
+      @Override
+      public String getPathForHadoop()
+      {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
+      {
+        if (enablePushFailure && mustFail) {
+          mustFail = false;
+          throw new IOException("Push failure test");
+        } else if (enablePushFailure) {
+          mustFail = true;
+        }
+        pushedSegments.add(segment);
+        return segment;
+      }
+
+      @Override
+      public Map<String, Object> makeLoadSpec(URI uri)
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+    appenderator = Appenderators.createOffline(
+        schema.getDataSource(),
+        schema,
+        tuningConfig,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        indexIO,
+        indexMerger,
+        rowIngestionMeters,
+        new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
+        batchMemoryMappedIndex
+    );
+  }
+
+  private long getDefaultMaxBytesInMemory()
+  {
+    return (Runtime.getRuntime().totalMemory()) / 3;
+  }
+
+  public DataSchema getSchema()
+  {
+    return schema;
+  }
+
+  public IndexTask.IndexTuningConfig getTuningConfig()
+  {
+    return tuningConfig;
+  }
+
+  public FireDepartmentMetrics getMetrics()
+  {
+    return metrics;
+  }
+
+  public DataSegmentPusher getDataSegmentPusher()
+  {
+    return dataSegmentPusher;
+  }
+
+  public ObjectMapper getObjectMapper()
+  {
+    return objectMapper;
+  }
+
+  public Appenderator getAppenderator()
+  {
+    return appenderator;
+  }
+
+  public List<DataSegment> getPushedSegments()
+  {
+    return pushedSegments;
+  }
+
+  @Override
+  public void close() throws Exception
+  {
+    appenderator.close();
+    emitter.close();
+    FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
+  }
+
+  private static File createNewBasePersistDirectory()
+  {
+    return FileUtils.createTempDir("druid-batch-persist");
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index b5a8d73..6715569 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -112,6 +112,7 @@ public class TaskToolboxTest
             null,
             null,
             null,
+            false,
             false
         ),
         new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 46fa4fa..113d41a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1516,6 +1516,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
         null,
         null,
         null,
+        false,
         false
     );
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 03acabd..a958ee6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -1298,7 +1298,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     );
 
     return new TaskToolbox(
-        new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+        new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
         null,
         createActionClient(task),
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c4faa5b..330ccbf 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -1747,7 +1747,7 @@ public class CompactionTaskTest
     )
     {
       super(
-          new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+          new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
           null,
           taskActionClient,
           null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index caaeea2..97d6df1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -117,6 +117,7 @@ public class HadoopTaskTest
         null,
         null,
         null,
+        false,
         false
     )).once();
     EasyMock.replay(toolbox);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 4123688..c42bf20 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -314,7 +314,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
         );
 
         final TaskToolbox box = new TaskToolbox(
-            new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+            new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
             new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
             taskActionClient,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 5715662..0a2e34d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -898,6 +898,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
         null,
         null,
         null,
+        false,
         false
     );
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 2a342b7..6e963e6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -105,7 +105,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager
       IndexIO indexIO,
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean batchMemoryMappedIndex
   )
   {
     return Appenderators.createOffline(
@@ -118,7 +119,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager
         indexIO,
         indexMerger,
         rowIngestionMeters,
-        parseExceptionHandler
+        parseExceptionHandler,
+        batchMemoryMappedIndex
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 23b5b9d..a0d5449 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -241,6 +241,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
             null,
             null,
             ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
+            false,
             false
         ),
         null
@@ -597,7 +598,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
 
   public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
   {
-    final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false);
+    final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false);
 
     objectMapper.setInjectableValues(
         new InjectableValues.Std()
@@ -632,7 +633,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
   protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
   {
     return new TaskToolbox(
-        new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+        new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
         new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
         actionClient,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 120c709..c05cdb8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -90,6 +90,7 @@ public class SingleTaskBackgroundRunnerTest
         null,
         null,
         null,
+        false,
         false
     );
     final ServiceEmitter emitter = new NoopServiceEmitter();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 6e82472..7c41ba9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -600,7 +600,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
         new TaskAuditLogConfig(true)
     );
     File tmpDir = temporaryFolder.newFolder();
-    taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false);
+    taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false, false);
 
     return new TaskToolboxFactory(
         taskConfig,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 1bb33f0..2dd94e8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -89,6 +89,7 @@ public class WorkerTaskManagerTest
         null,
         null,
         null,
+        false,
         false
     );
     TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 2a6e793..c184509 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -163,6 +163,7 @@ public class WorkerTaskMonitorTest
         null,
         null,
         null,
+        false,
         false
     );
     TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
index a10776b..9736fc7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
@@ -92,6 +92,7 @@ public class IntermediaryDataManagerAutoCleanupTest
         null,
         null,
         ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
+        false,
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
index 1629932..6e52b30 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -75,6 +75,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
         null,
         null,
         ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
+        false,
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index ea6fb5f..f110cc2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -71,6 +71,7 @@ public class ShuffleDataSegmentPusherTest
         null,
         null,
         ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
+        false,
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
index e4327d3..54a6b02 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -97,6 +97,7 @@ public class ShuffleResourceTest
         null,
         null,
         ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
+        false,
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
index 9c59387..29a8986 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
@@ -213,7 +213,7 @@ public class FireHydrant
     // Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints
     // all the rows in the index
     return "FireHydrant{" +
-           "queryable=" + adapter.get().getId() +
+           "queryable=" + (adapter.get() == null ? "null" : adapter.get().getId()) +
            ", count=" + count +
            '}';
   }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 14796df..1d7da70 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -215,6 +215,15 @@ public interface Appenderator extends QuerySegmentWalker
   void closeNow();
 
   /**
+   * Flag to tell internals whether appenderator is working on behalf of a real time task.
+   * This is to manage certain aspects as needed. For example, for batch, non-real time tasks,
+   * physical segments (i.e. hydrants) do not need to memory map their persisted
+   * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance
+   * of OOMs.
+   */
+  boolean isRealTime();
+
+  /**
    * Result of {@link Appenderator#add} containing following information
    * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added
    * - int - positive number indicating how many summarized rows exist in this segment so far and
@@ -242,7 +251,8 @@ public interface Appenderator extends QuerySegmentWalker
       return segmentIdentifier;
     }
 
-    int getNumRowsInSegment()
+    @VisibleForTesting
+    public int getNumRowsInSegment()
     {
       return numRowsInSegment;
     }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 9121e0a..e6cd9c5 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.data.input.Committer;
 import org.apache.druid.data.input.InputRow;
@@ -59,6 +60,7 @@ import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -70,6 +72,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.segment.realtime.plumber.Sink;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.joda.time.Interval;
 
@@ -83,7 +86,9 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -161,6 +166,19 @@ public class AppenderatorImpl implements Appenderator
 
   private volatile Throwable persistError;
 
+  private final boolean isRealTime;
+  /**
+   * Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator
+   * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant
+   * at merge time. This is necessary since batch appenderator will not map the QueryableIndex
+   * at persist time in order to minimize its memory footprint. This has to be synchronized since the
+   * map may be accessed from multiple threads.
+   * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted
+   * with reference semantics.
+   */
+  private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata =
+      Collections.synchronizedMap(new IdentityHashMap<>());
+
   /**
    * This constructor allows the caller to provide its own SinkQuerySegmentWalker.
    *
@@ -183,7 +201,8 @@ public class AppenderatorImpl implements Appenderator
       IndexMerger indexMerger,
       Cache cache,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean isRealTime
   )
   {
     this.myId = id;
@@ -199,6 +218,7 @@ public class AppenderatorImpl implements Appenderator
     this.texasRanger = sinkQuerySegmentWalker;
     this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
     this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+    this.isRealTime = isRealTime;
 
     if (sinkQuerySegmentWalker == null) {
       this.sinkTimeline = new VersionedIntervalTimeline<>(
@@ -339,7 +359,8 @@ public class AppenderatorImpl implements Appenderator
           if (sinkEntry != null) {
             bytesToBePersisted += sinkEntry.getBytesInMemory();
             if (sinkEntry.swappable()) {
-              // After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
+              // After swapping the sink, we use memory mapped segment instead (but only for real time appenderators!).
+              // However, the memory mapped segment still consumes memory.
               // These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
               int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
               bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
@@ -352,10 +373,14 @@ public class AppenderatorImpl implements Appenderator
           // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
           final String alertMessage = StringUtils.format(
               "Task has exceeded safe estimated heap usage limits, failing "
-              + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
+              + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])"
+              + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])",
               sinks.size(),
               sinks.values().stream().mapToInt(Iterables::size).sum(),
-              getTotalRowCount()
+              getTotalRowCount(),
+              bytesCurrentlyInMemory.get(),
+              bytesToBePersisted,
+              maxBytesTuningConfig
           );
           final String errorMessage = StringUtils.format(
               "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
@@ -530,6 +555,9 @@ public class AppenderatorImpl implements Appenderator
           futures.add(abandonSegment(entry.getKey(), entry.getValue(), true));
         }
 
+        // Re-initialize hydrant map:
+        persistedHydrantMetadata.clear();
+
         // Await dropping.
         Futures.allAsList(futures).get();
       }
@@ -558,6 +586,9 @@ public class AppenderatorImpl implements Appenderator
     final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>();
     int numPersistedRows = 0;
     long bytesPersisted = 0L;
+    MutableLong totalHydrantsCount = new MutableLong();
+    MutableLong totalHydrantsPersisted = new MutableLong();
+    final long totalSinks = sinks.size();
     for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
       final SegmentIdWithShardSpec identifier = entry.getKey();
       final Sink sink = entry.getValue();
@@ -565,21 +596,26 @@ public class AppenderatorImpl implements Appenderator
         throw new ISE("No sink for identifier: %s", identifier);
       }
       final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+      totalHydrantsCount.add(hydrants.size());
       currentHydrants.put(identifier.toString(), hydrants.size());
       numPersistedRows += sink.getNumRowsInMemory();
       bytesPersisted += sink.getBytesInMemory();
 
       final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
 
+      // gather hydrants that have not been persisted:
       for (FireHydrant hydrant : hydrants.subList(0, limit)) {
         if (!hydrant.hasSwapped()) {
           log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
           indexesToPersist.add(Pair.of(hydrant, identifier));
+          totalHydrantsPersisted.add(1);
         }
       }
 
       if (sink.swappable()) {
+        // It is swappable. Get the old one to persist it and create a new one:
         indexesToPersist.add(Pair.of(sink.swap(), identifier));
+        totalHydrantsPersisted.add(1);
       }
     }
     log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
@@ -587,6 +623,7 @@ public class AppenderatorImpl implements Appenderator
     final Object commitMetadata = committer == null ? null : committer.getMetadata();
     final Stopwatch runExecStopwatch = Stopwatch.createStarted();
     final Stopwatch persistStopwatch = Stopwatch.createStarted();
+    AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
     final ListenableFuture<Object> future = persistExecutor.submit(
         new Callable<Object>()
         {
@@ -640,6 +677,14 @@ public class AppenderatorImpl implements Appenderator
                                   .distinct()
                                   .collect(Collectors.joining(", "))
               );
+              log.info(
+                  "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]",
+                  rowIngestionMeters.getProcessed(),
+                  totalPersistedRows.get(),
+                  totalSinks,
+                  totalHydrantsCount.longValue(),
+                  totalHydrantsPersisted.longValue()
+              );
 
               // return null if committer is null
               return commitMetadata;
@@ -682,6 +727,7 @@ public class AppenderatorImpl implements Appenderator
   )
   {
     final Map<SegmentIdWithShardSpec, Sink> theSinks = new HashMap<>();
+    AtomicLong pushedHydrantsCount = new AtomicLong();
     for (final SegmentIdWithShardSpec identifier : identifiers) {
       final Sink sink = sinks.get(identifier);
       if (sink == null) {
@@ -691,6 +737,8 @@ public class AppenderatorImpl implements Appenderator
       if (sink.finishWriting()) {
         totalRows.addAndGet(-sink.getNumRows());
       }
+      // count hydrants for stats:
+      pushedHydrantsCount.addAndGet(Iterables.size(sink));
     }
 
     return Futures.transform(
@@ -700,6 +748,10 @@ public class AppenderatorImpl implements Appenderator
         (Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
           final List<DataSegment> dataSegments = new ArrayList<>();
 
+          log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]",
+                   rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get()
+          );
+
           log.debug(
               "Building and pushing segments: %s",
               theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", "))
@@ -723,6 +775,8 @@ public class AppenderatorImpl implements Appenderator
             }
           }
 
+          log.info("Push complete...");
+
           return new SegmentsAndCommitMetadata(dataSegments, commitMetadata);
         },
         pushExecutor
@@ -813,6 +867,34 @@ public class AppenderatorImpl implements Appenderator
       Closer closer = Closer.create();
       try {
         for (FireHydrant fireHydrant : sink) {
+
+          // if batch, swap/persist did not memory map the incremental index, we need it mapped now:
+          if (!isRealTime()) {
+
+            // sanity
+            Pair<File, SegmentId> persistedMetadata = persistedHydrantMetadata.get(fireHydrant);
+            if (persistedMetadata == null) {
+              throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant);
+            }
+
+            File persistedFile = persistedMetadata.lhs;
+            SegmentId persistedSegmentId = persistedMetadata.rhs;
+
+            // sanity:
+            if (persistedFile == null) {
+              throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant);
+            } else if (persistedSegmentId == null) {
+              throw new ISE(
+                  "Persisted segmentId for batch hydrant in file [%s] is null!",
+                  persistedFile.getPath()
+              );
+            }
+            fireHydrant.swapSegment(new QueryableIndexSegment(
+                indexIO.loadIndex(persistedFile),
+                persistedSegmentId
+            ));
+          }
+
           Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
           final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
           log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
@@ -860,6 +942,15 @@ public class AppenderatorImpl implements Appenderator
           5
       );
 
+      if (!isRealTime()) {
+        // Drop the queryable indexes behind the hydrants... they are not needed anymore and their
+        // mapped file references
+        // can generate OOMs during merge if enough of them are held back...
+        for (FireHydrant fireHydrant : sink) {
+          fireHydrant.swapSegment(null);
+        }
+      }
+
       final long pushFinishTime = System.nanoTime();
 
       objectMapper.writeValue(descriptorFile, segment);
@@ -986,6 +1077,13 @@ public class AppenderatorImpl implements Appenderator
     }
   }
 
+  @Override
+  public boolean isRealTime()
+  {
+    return isRealTime;
+  }
+
+
   private void lockBasePersistDirectory()
   {
     if (basePersistDirLock == null) {
@@ -1303,6 +1401,8 @@ public class AppenderatorImpl implements Appenderator
                 cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
               }
               hydrant.swapSegment(null);
+              // remove hydrant from persisted metadata:
+              persistedHydrantMetadata.remove(hydrant);
             }
 
             if (removeOnDiskData) {
@@ -1417,9 +1517,15 @@ public class AppenderatorImpl implements Appenderator
             numRows
         );
 
-        indexToPersist.swapSegment(
-            new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId())
-        );
+        // Map only when this appenderator is being driven by a real time task:
+        Segment segmentToSwap = null;
+        if (isRealTime()) {
+          segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId());
+        } else {
+          // remember file path & segment id to rebuild the queryable index for merge:
+          persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId()));
+        }
+        indexToPersist.swapSegment(segmentToSwap);
 
         return numRows;
       }
@@ -1457,10 +1563,15 @@ public class AppenderatorImpl implements Appenderator
     // These calculations are approximated from actual heap dumps.
     // Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment,
     // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.)
-    return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
-           (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
-           (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
-           ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+    int total;
+    total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT;
+    if (isRealTime()) {
+      // for real time add references to byte memory mapped references..
+      total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
+               (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+               ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+    }
+    return total;
   }
 
   private int calculateSinkMemoryInUsed(Sink sink)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index c59e4e0..ff8799d 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -89,7 +89,8 @@ public class Appenderators
         indexMerger,
         cache,
         rowIngestionMeters,
-        parseExceptionHandler
+        parseExceptionHandler,
+        true
     );
   }
 
@@ -103,7 +104,8 @@ public class Appenderators
       IndexIO indexIO,
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean batchMemoryMappedIndex
   )
   {
     return new AppenderatorImpl(
@@ -119,7 +121,8 @@ public class Appenderators
         indexMerger,
         null,
         rowIngestionMeters,
-        parseExceptionHandler
+        parseExceptionHandler,
+        batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 76c64d2..c5b22cf 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -98,7 +98,8 @@ public interface AppenderatorsManager
       IndexIO indexIO,
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean batchMemoryMappedIndex
   );
 
   /**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
index 7a0f1dc..8abd0c2 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
@@ -73,7 +73,8 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory
             false,
             config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
             0
-        )
+        ),
+        false
     );
   }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 87de244..f1e2f3c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -90,7 +90,8 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
       IndexIO indexIO,
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean batchMemoryMappedIndex
   )
   {
     throw new UOE(ERROR_MSG);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 7fa4f4c..88a4f57 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -122,7 +122,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
       IndexIO indexIO,
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean batchMemoryMappedIndex
   )
   {
     // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@@ -139,7 +140,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
           indexIO,
           indexMerger,
           rowIngestionMeters,
-          parseExceptionHandler
+          parseExceptionHandler,
+          batchMemoryMappedIndex
       );
       return batchAppenderator;
     }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 3697098..3780c37 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -189,7 +189,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
           wrapIndexMerger(indexMerger),
           cache,
           rowIngestionMeters,
-          parseExceptionHandler
+          parseExceptionHandler,
+          true
       );
 
       datasourceBundle.addAppenderator(taskId, appenderator);
@@ -208,7 +209,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
       IndexIO indexIO,
       IndexMerger indexMerger,
       RowIngestionMeters rowIngestionMeters,
-      ParseExceptionHandler parseExceptionHandler
+      ParseExceptionHandler parseExceptionHandler,
+      boolean batchMemoryMappedIndex
   )
   {
     synchronized (this) {
@@ -227,7 +229,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
           indexIO,
           wrapIndexMerger(indexMerger),
           rowIngestionMeters,
-          parseExceptionHandler
+          parseExceptionHandler,
+          batchMemoryMappedIndex
       );
       datasourceBundle.addAppenderator(taskId, appenderator);
       return appenderator;
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
index 4f28842..464141a 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
@@ -210,4 +210,11 @@ public class FireHydrantTest extends InitializedNullHandlingTest
         Function.identity()
     );
   }
+
+  @Test
+  public void testToStringWhenSwappedWithNull()
+  {
+    hydrant.swapSegment(null);
+    hydrant.toString();
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4f9cd3c..408aa97 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -545,5 +545,11 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
     {
       throw new UnsupportedOperationException();
     }
+    @Override
+    public boolean isRealTime()
+    {
+      return true;
+    }
+
   }
 }
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
index f7c85b2..728e980 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
@@ -84,7 +84,8 @@ public class UnifiedIndexerAppenderatorsManagerTest
       TestHelper.getTestIndexIO(),
       TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()),
       new NoopRowIngestionMeters(),
-      new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0)
+      new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0),
+      false
   );
 
   @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org