You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/05/11 21:34:57 UTC
[druid] branch master updated: Avoid memory mapping hydrants after
they are persisted & after they are merged for native batch ingestion
(#11123)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8e5048e Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion (#11123)
8e5048e is described below
commit 8e5048e643f95c25cc95be91317475fecbdce456
Author: Agustin Gonzalez <ag...@imply.io>
AuthorDate: Tue May 11 14:34:26 2021 -0700
Avoid memory mapping hydrants after they are persisted & after they are merged for native batch ingestion (#11123)
* Avoid mapping hydrants in create segments phase for native ingestion
* Drop queriable indices after a given sink is fully merged
* Do not drop memory mappings for realtime ingestion
* Style fixes
* Renamed to match use case better
* Rollback memoization code and use the real time flag instead
* Null ptr fix in FireHydrant toString plus adjustments to memory pressure tracking calculations
* Style
* Log some count stats
* Make sure sinks size is obtained at the right time
* BatchAppenderator unit test
* Fix comment typos
* Renamed methods to make them more readable
* Move persisted metadata from FireHydrant class to AppenderatorImpl. Removed superfluous differences and fix comment typo. Removed custom comparator
* Missing dependency
* Make persisted hydrant metadata map concurrent and better reflect the fact that keys are Java references. Maintain persisted metadata when dropping/closing segments.
* Replaced concurrent variables with normal ones
* Added batchMemoryMappedIndex "fallback" flag with default "false". Set this to "true" make code fallback to previous code path.
* Style fix.
* Added note to new setting in doc, using Iterables.size (and removing a dependency), and fixing a typo in a comment.
* Forgot to commit this edited documentation message
---
docs/configuration/index.md | 1 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 +
.../druid/indexing/common/config/TaskConfig.java | 14 +-
.../indexing/common/task/BatchAppenderators.java | 3 +-
.../appenderator/BatchAppenderatorTest.java | 233 ++++++++++++++++
.../appenderator/BatchAppenderatorTester.java | 293 +++++++++++++++++++++
.../druid/indexing/common/TaskToolboxTest.java | 1 +
.../AppenderatorDriverRealtimeIndexTaskTest.java | 1 +
.../common/task/CompactionTaskRunTest.java | 2 +-
.../indexing/common/task/CompactionTaskTest.java | 2 +-
.../druid/indexing/common/task/HadoopTaskTest.java | 1 +
.../indexing/common/task/IngestionTestBase.java | 2 +-
.../common/task/RealtimeIndexTaskTest.java | 1 +
.../common/task/TestAppenderatorsManager.java | 6 +-
.../AbstractParallelIndexSupervisorTaskTest.java | 5 +-
.../overlord/SingleTaskBackgroundRunnerTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +-
.../indexing/worker/WorkerTaskManagerTest.java | 1 +
.../indexing/worker/WorkerTaskMonitorTest.java | 1 +
.../IntermediaryDataManagerAutoCleanupTest.java | 1 +
...ermediaryDataManagerManualAddAndDeleteTest.java | 1 +
.../shuffle/ShuffleDataSegmentPusherTest.java | 1 +
.../worker/shuffle/ShuffleResourceTest.java | 1 +
.../apache/druid/segment/realtime/FireHydrant.java | 2 +-
.../realtime/appenderator/Appenderator.java | 12 +-
.../realtime/appenderator/AppenderatorImpl.java | 133 +++++++++-
.../realtime/appenderator/Appenderators.java | 9 +-
.../appenderator/AppenderatorsManager.java | 3 +-
.../DefaultOfflineAppenderatorFactory.java | 3 +-
.../DummyForInjectionAppenderatorsManager.java | 3 +-
.../appenderator/PeonAppenderatorsManager.java | 6 +-
.../UnifiedIndexerAppenderatorsManager.java | 9 +-
.../druid/segment/realtime/FireHydrantTest.java | 7 +
.../StreamAppenderatorDriverFailTest.java | 6 +
.../UnifiedIndexerAppenderatorsManagerTest.java | 3 +-
36 files changed, 737 insertions(+), 35 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index fdb9365..35d6ad6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1332,6 +1332,7 @@ Additional peon configs include:
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
+|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1288104..34ef75c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2782,6 +2782,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
null,
null,
+ false,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 6b49759..19d2445 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2868,6 +2868,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
null,
null,
null,
+ false,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index f0999d5..0285b33 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -76,6 +76,9 @@ public class TaskConfig
@JsonProperty
private final boolean ignoreTimestampSpecForDruidInputSource;
+ @JsonProperty
+ private final boolean batchMemoryMappedIndex;
+
@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@@ -87,7 +90,8 @@ public class TaskConfig
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
- @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
+ @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
+ @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -113,6 +117,7 @@ public class TaskConfig
this.shuffleDataLocations = shuffleDataLocations;
}
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
+ this.batchMemoryMappedIndex = batchMemoryMapIndex;
}
@JsonProperty
@@ -195,6 +200,13 @@ public class TaskConfig
return ignoreTimestampSpecForDruidInputSource;
}
+ @JsonProperty
+ public boolean getBatchMemoryMappedIndex()
+ {
+ return batchMemoryMappedIndex;
+ }
+
+
private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
index 711f9d4..bff138f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java
@@ -80,7 +80,8 @@ public final class BatchAppenderators
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ toolbox.getConfig().getBatchMemoryMappedIndex()
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
new file mode 100644
index 0000000..e441763
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.appenderator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BatchAppenderatorTest extends InitializedNullHandlingTest
+{
+ private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
+ createSegmentId("2000/2001", "A", 0),
+ createSegmentId("2000/2001", "A", 1),
+ createSegmentId("2001/2002", "A", 0)
+ );
+
+ @Test
+ public void testSimpleIngestionWithIndexesNotMapped() throws Exception
+ {
+ try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
+ false,
+ false)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ boolean thrown;
+
+ // startJob
+ Assert.assertEquals(null, appenderator.startJob());
+
+ // getDataSource
+ Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+ // add
+ Assert.assertEquals(
+ 1,
+ appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
+ .getNumRowsInSegment()
+ );
+
+ Assert.assertEquals(
+ 2,
+ appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
+ .getNumRowsInSegment()
+ );
+
+ Assert.assertEquals(
+ 1,
+ appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
+ .getNumRowsInSegment()
+ );
+
+ // getSegments
+ Assert.assertEquals(IDENTIFIERS.subList(0, 2),
+ appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
+
+ // getRowCount
+ Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
+ Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
+ thrown = false;
+ try {
+ appenderator.getRowCount(IDENTIFIERS.get(2));
+ }
+ catch (IllegalStateException e) {
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // push all
+ final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+ appenderator.getSegments(),
+ null,
+ false
+ ).get();
+ Assert.assertEquals(
+ IDENTIFIERS.subList(0, 2),
+ Lists.transform(
+ segmentsAndCommitMetadata.getSegments(),
+ new Function<DataSegment, SegmentIdWithShardSpec>()
+ {
+ @Override
+ public SegmentIdWithShardSpec apply(DataSegment input)
+ {
+ return SegmentIdWithShardSpec.fromDataSegment(input);
+ }
+ }
+ ).stream().sorted().collect(Collectors.toList())
+ );
+ Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+ segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
+
+ appenderator.clear();
+ Assert.assertTrue(appenderator.getSegments().isEmpty());
+ }
+ }
+
+ @Test
+ public void testSimpleIngestionWithIndexesMapped() throws Exception
+ {
+ try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
+ false,
+ true)) {
+ final Appenderator appenderator = tester.getAppenderator();
+ boolean thrown;
+
+ // startJob
+ Assert.assertEquals(null, appenderator.startJob());
+
+ // getDataSource
+ Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());
+
+ // add
+ Assert.assertEquals(
+ 1,
+ appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
+ .getNumRowsInSegment()
+ );
+
+ Assert.assertEquals(
+ 2,
+ appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
+ .getNumRowsInSegment()
+ );
+
+ Assert.assertEquals(
+ 1,
+ appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
+ .getNumRowsInSegment()
+ );
+
+ // getSegments
+ Assert.assertEquals(IDENTIFIERS.subList(0, 2),
+ appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
+
+ // getRowCount
+ Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
+ Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
+ thrown = false;
+ try {
+ appenderator.getRowCount(IDENTIFIERS.get(2));
+ }
+ catch (IllegalStateException e) {
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // push all
+ final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
+ appenderator.getSegments(),
+ null,
+ false
+ ).get();
+ Assert.assertEquals(
+ IDENTIFIERS.subList(0, 2),
+ Lists.transform(
+ segmentsAndCommitMetadata.getSegments(),
+ new Function<DataSegment, SegmentIdWithShardSpec>()
+ {
+ @Override
+ public SegmentIdWithShardSpec apply(DataSegment input)
+ {
+ return SegmentIdWithShardSpec.fromDataSegment(input);
+ }
+ }
+ ).stream().sorted().collect(Collectors.toList())
+ );
+ Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
+ segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
+
+ appenderator.clear();
+ Assert.assertTrue(appenderator.getSegments().isEmpty());
+ }
+ }
+ private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
+ {
+ return new SegmentIdWithShardSpec(
+ AppenderatorTester.DATASOURCE,
+ Intervals.of(interval),
+ version,
+ new LinearShardSpec(partitionNum)
+
+ );
+ }
+
+ static InputRow createInputRow(String ts, String dim, Object met)
+ {
+ return new MapBasedInputRow(
+ DateTimes.of(ts).getMillis(),
+ ImmutableList.of("dim"),
+ ImmutableMap.of(
+ "dim",
+ dim,
+ "met",
+ met
+ )
+ );
+ }
+
+
+}
+
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
new file mode 100644
index 0000000..4058aff
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexMergerV9;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.ParseExceptionHandler;
+import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class BatchAppenderatorTester implements AutoCloseable
+{
+ public static final String DATASOURCE = "foo";
+
+ private final DataSchema schema;
+ private final IndexTask.IndexTuningConfig tuningConfig;
+ private final FireDepartmentMetrics metrics;
+ private final DataSegmentPusher dataSegmentPusher;
+ private final ObjectMapper objectMapper;
+ private final Appenderator appenderator;
+ private final IndexIO indexIO;
+ private final IndexMerger indexMerger;
+ private final ServiceEmitter emitter;
+
+ private final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
+
+ public BatchAppenderatorTester(
+ final int maxRowsInMemory,
+ final boolean enablePushFailure,
+ boolean batchMemoryMappedIndex
+ )
+ {
+ this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex);
+ }
+
+ public BatchAppenderatorTester(
+ final int maxRowsInMemory,
+ final long maxSizeInBytes,
+ final File basePersistDirectory,
+ final boolean enablePushFailure,
+ boolean batchMemoryMappedIndex
+ )
+ {
+ this(
+ maxRowsInMemory,
+ maxSizeInBytes,
+ basePersistDirectory,
+ enablePushFailure,
+ new SimpleRowIngestionMeters(),
+ false,
+ batchMemoryMappedIndex
+ );
+ }
+
+ public BatchAppenderatorTester(
+ final int maxRowsInMemory,
+ final long maxSizeInBytes,
+ final File basePersistDirectory,
+ final boolean enablePushFailure,
+ final RowIngestionMeters rowIngestionMeters,
+ final boolean skipBytesInMemoryOverheadCheck,
+ boolean batchMemoryMappedIndex
+ )
+ {
+ objectMapper = new DefaultObjectMapper();
+ objectMapper.registerSubtypes(LinearShardSpec.class);
+
+ final Map<String, Object> parserMap = objectMapper.convertValue(
+ new MapInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("ts", "auto", null),
+ new DimensionsSpec(null, null, null),
+ null,
+ null,
+ null
+ )
+ ),
+ Map.class
+ );
+ schema = new DataSchema(
+ DATASOURCE,
+ parserMap,
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("met", "met")
+ },
+ new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+ null,
+ objectMapper
+ );
+ tuningConfig = new IndexTask.IndexTuningConfig(
+ null,
+ 2,
+ null,
+ maxRowsInMemory,
+ maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
+ false,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ true,
+ null,
+ null,
+ null,
+ null
+ ).withBasePersistDirectory(createNewBasePersistDirectory());
+
+ metrics = new FireDepartmentMetrics();
+
+ indexIO = new IndexIO(
+ objectMapper,
+ new ColumnConfig()
+ {
+ @Override
+ public int columnCacheSizeBytes()
+ {
+ return 0;
+ }
+ }
+ );
+ indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
+
+ emitter = new ServiceEmitter(
+ "test",
+ "test",
+ new NoopEmitter()
+ );
+ emitter.start();
+ EmittingLogger.registerEmitter(emitter);
+ dataSegmentPusher = new DataSegmentPusher()
+ {
+ private boolean mustFail = true;
+
+ @Deprecated
+ @Override
+ public String getPathForHadoop(String dataSource)
+ {
+ return getPathForHadoop();
+ }
+
+ @Override
+ public String getPathForHadoop()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
+ {
+ if (enablePushFailure && mustFail) {
+ mustFail = false;
+ throw new IOException("Push failure test");
+ } else if (enablePushFailure) {
+ mustFail = true;
+ }
+ pushedSegments.add(segment);
+ return segment;
+ }
+
+ @Override
+ public Map<String, Object> makeLoadSpec(URI uri)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ appenderator = Appenderators.createOffline(
+ schema.getDataSource(),
+ schema,
+ tuningConfig,
+ metrics,
+ dataSegmentPusher,
+ objectMapper,
+ indexIO,
+ indexMerger,
+ rowIngestionMeters,
+ new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0),
+ batchMemoryMappedIndex
+ );
+ }
+
+ private long getDefaultMaxBytesInMemory()
+ {
+ return (Runtime.getRuntime().totalMemory()) / 3;
+ }
+
+ public DataSchema getSchema()
+ {
+ return schema;
+ }
+
+ public IndexTask.IndexTuningConfig getTuningConfig()
+ {
+ return tuningConfig;
+ }
+
+ public FireDepartmentMetrics getMetrics()
+ {
+ return metrics;
+ }
+
+ public DataSegmentPusher getDataSegmentPusher()
+ {
+ return dataSegmentPusher;
+ }
+
+ public ObjectMapper getObjectMapper()
+ {
+ return objectMapper;
+ }
+
+ public Appenderator getAppenderator()
+ {
+ return appenderator;
+ }
+
+ public List<DataSegment> getPushedSegments()
+ {
+ return pushedSegments;
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ appenderator.close();
+ emitter.close();
+ FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
+ }
+
+ private static File createNewBasePersistDirectory()
+ {
+ return FileUtils.createTempDir("druid-batch-persist");
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index b5a8d73..6715569 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -112,6 +112,7 @@ public class TaskToolboxTest
null,
null,
null,
+ false,
false
),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 46fa4fa..113d41a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1516,6 +1516,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
null,
null,
null,
+ false,
false
);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 03acabd..a958ee6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -1298,7 +1298,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
return new TaskToolbox(
- new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+ new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
null,
createActionClient(task),
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c4faa5b..330ccbf 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -1747,7 +1747,7 @@ public class CompactionTaskTest
)
{
super(
- new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+ new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
null,
taskActionClient,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index caaeea2..97d6df1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -117,6 +117,7 @@ public class HadoopTaskTest
null,
null,
null,
+ false,
false
)).once();
EasyMock.replay(toolbox);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 4123688..c42bf20 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -314,7 +314,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest
);
final TaskToolbox box = new TaskToolbox(
- new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+ new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
taskActionClient,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 5715662..0a2e34d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -898,6 +898,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
null,
null,
null,
+ false,
false
);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 2a342b7..6e963e6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -105,7 +105,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean batchMemoryMappedIndex
)
{
return Appenderators.createOffline(
@@ -118,7 +119,8 @@ public class TestAppenderatorsManager implements AppenderatorsManager
indexIO,
indexMerger,
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ batchMemoryMappedIndex
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 23b5b9d..a0d5449 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -241,6 +241,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
+ false,
false
),
null
@@ -597,7 +598,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO)
{
- final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false);
+ final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false);
objectMapper.setInjectableValues(
new InjectableValues.Std()
@@ -632,7 +633,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException
{
return new TaskToolbox(
- new TaskConfig(null, null, null, null, null, false, null, null, null, false),
+ new TaskConfig(null, null, null, null, null, false, null, null, null, false, false),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
actionClient,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 120c709..c05cdb8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -90,6 +90,7 @@ public class SingleTaskBackgroundRunnerTest
null,
null,
null,
+ false,
false
);
final ServiceEmitter emitter = new NoopServiceEmitter();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 6e82472..7c41ba9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -600,7 +600,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new TaskAuditLogConfig(true)
);
File tmpDir = temporaryFolder.newFolder();
- taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false);
+ taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false, false);
return new TaskToolboxFactory(
taskConfig,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 1bb33f0..2dd94e8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -89,6 +89,7 @@ public class WorkerTaskManagerTest
null,
null,
null,
+ false,
false
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 2a6e793..c184509 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -163,6 +163,7 @@ public class WorkerTaskMonitorTest
null,
null,
null,
+ false,
false
);
TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
index a10776b..9736fc7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java
@@ -92,6 +92,7 @@ public class IntermediaryDataManagerAutoCleanupTest
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
+ false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
index 1629932..6e52b30 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -75,6 +75,7 @@ public class IntermediaryDataManagerManualAddAndDeleteTest
null,
null,
ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
+ false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index ea6fb5f..f110cc2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -71,6 +71,7 @@ public class ShuffleDataSegmentPusherTest
null,
null,
ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)),
+ false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
index e4327d3..54a6b02 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -97,6 +97,7 @@ public class ShuffleResourceTest
null,
null,
ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)),
+ false,
false
);
final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
index 9c59387..29a8986 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java
@@ -213,7 +213,7 @@ public class FireHydrant
// Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints
// all the rows in the index
return "FireHydrant{" +
- "queryable=" + adapter.get().getId() +
+ "queryable=" + (adapter.get() == null ? "null" : adapter.get().getId()) +
", count=" + count +
'}';
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 14796df..1d7da70 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -215,6 +215,15 @@ public interface Appenderator extends QuerySegmentWalker
void closeNow();
/**
+ * Flag to tell internals whether appenderator is working on behalf of a real time task.
+ * This is to manage certain aspects as needed. For example, for batch, non-real time tasks,
+ * physical segments (i.e. hydrants) do not need to memory map their persisted
+ * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance
+ * of OOMs.
+ */
+ boolean isRealTime();
+
+ /**
* Result of {@link Appenderator#add} containing following information
* - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added
* - int - positive number indicating how many summarized rows exist in this segment so far and
@@ -242,7 +251,8 @@ public interface Appenderator extends QuerySegmentWalker
return segmentIdentifier;
}
- int getNumRowsInSegment()
+ @VisibleForTesting
+ public int getNumRowsInSegment()
{
return numRowsInSegment;
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 9121e0a..e6cd9c5 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang.mutable.MutableLong;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
@@ -59,6 +60,7 @@ import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
@@ -70,6 +72,7 @@ import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.plumber.Sink;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
@@ -83,7 +86,9 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -161,6 +166,19 @@ public class AppenderatorImpl implements Appenderator
private volatile Throwable persistError;
+ private final boolean isRealTime;
+ /**
+ * Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator
+ * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant
+ * at merge time. This is necessary since batch appenderator will not map the QueryableIndex
+ * at persist time in order to minimize its memory footprint. This has to be synchronized since the
+ * map may be accessed from multiple threads.
+ * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted
+ * with reference semantics.
+ */
+ private final Map<FireHydrant, Pair<File, SegmentId>> persistedHydrantMetadata =
+ Collections.synchronizedMap(new IdentityHashMap<>());
+
/**
* This constructor allows the caller to provide its own SinkQuerySegmentWalker.
*
@@ -183,7 +201,8 @@ public class AppenderatorImpl implements Appenderator
IndexMerger indexMerger,
Cache cache,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean isRealTime
)
{
this.myId = id;
@@ -199,6 +218,7 @@ public class AppenderatorImpl implements Appenderator
this.texasRanger = sinkQuerySegmentWalker;
this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters");
this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler");
+ this.isRealTime = isRealTime;
if (sinkQuerySegmentWalker == null) {
this.sinkTimeline = new VersionedIntervalTimeline<>(
@@ -339,7 +359,8 @@ public class AppenderatorImpl implements Appenderator
if (sinkEntry != null) {
bytesToBePersisted += sinkEntry.getBytesInMemory();
if (sinkEntry.swappable()) {
- // After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
+ // After swapping the sink, we use memory mapped segment instead (but only for real time appenderators!).
+ // However, the memory mapped segment still consumes memory.
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
@@ -352,10 +373,14 @@ public class AppenderatorImpl implements Appenderator
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
final String alertMessage = StringUtils.format(
"Task has exceeded safe estimated heap usage limits, failing "
- + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
+ + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])"
+ + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])",
sinks.size(),
sinks.values().stream().mapToInt(Iterables::size).sum(),
- getTotalRowCount()
+ getTotalRowCount(),
+ bytesCurrentlyInMemory.get(),
+ bytesToBePersisted,
+ maxBytesTuningConfig
);
final String errorMessage = StringUtils.format(
"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
@@ -530,6 +555,9 @@ public class AppenderatorImpl implements Appenderator
futures.add(abandonSegment(entry.getKey(), entry.getValue(), true));
}
+ // Re-initialize hydrant map:
+ persistedHydrantMetadata.clear();
+
// Await dropping.
Futures.allAsList(futures).get();
}
@@ -558,6 +586,9 @@ public class AppenderatorImpl implements Appenderator
final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>();
int numPersistedRows = 0;
long bytesPersisted = 0L;
+ MutableLong totalHydrantsCount = new MutableLong();
+ MutableLong totalHydrantsPersisted = new MutableLong();
+ final long totalSinks = sinks.size();
for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
final SegmentIdWithShardSpec identifier = entry.getKey();
final Sink sink = entry.getValue();
@@ -565,21 +596,26 @@ public class AppenderatorImpl implements Appenderator
throw new ISE("No sink for identifier: %s", identifier);
}
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+ totalHydrantsCount.add(hydrants.size());
currentHydrants.put(identifier.toString(), hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
bytesPersisted += sink.getBytesInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
+ // gather hydrants that have not been persisted:
for (FireHydrant hydrant : hydrants.subList(0, limit)) {
if (!hydrant.hasSwapped()) {
log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
indexesToPersist.add(Pair.of(hydrant, identifier));
+ totalHydrantsPersisted.add(1);
}
}
if (sink.swappable()) {
+ // It is swappable. Get the old one to persist it and create a new one:
indexesToPersist.add(Pair.of(sink.swap(), identifier));
+ totalHydrantsPersisted.add(1);
}
}
log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource());
@@ -587,6 +623,7 @@ public class AppenderatorImpl implements Appenderator
final Object commitMetadata = committer == null ? null : committer.getMetadata();
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
+ AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
final ListenableFuture<Object> future = persistExecutor.submit(
new Callable<Object>()
{
@@ -640,6 +677,14 @@ public class AppenderatorImpl implements Appenderator
.distinct()
.collect(Collectors.joining(", "))
);
+ log.info(
+ "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]",
+ rowIngestionMeters.getProcessed(),
+ totalPersistedRows.get(),
+ totalSinks,
+ totalHydrantsCount.longValue(),
+ totalHydrantsPersisted.longValue()
+ );
// return null if committer is null
return commitMetadata;
@@ -682,6 +727,7 @@ public class AppenderatorImpl implements Appenderator
)
{
final Map<SegmentIdWithShardSpec, Sink> theSinks = new HashMap<>();
+ AtomicLong pushedHydrantsCount = new AtomicLong();
for (final SegmentIdWithShardSpec identifier : identifiers) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
@@ -691,6 +737,8 @@ public class AppenderatorImpl implements Appenderator
if (sink.finishWriting()) {
totalRows.addAndGet(-sink.getNumRows());
}
+ // count hydrants for stats:
+ pushedHydrantsCount.addAndGet(Iterables.size(sink));
}
return Futures.transform(
@@ -700,6 +748,10 @@ public class AppenderatorImpl implements Appenderator
(Function<Object, SegmentsAndCommitMetadata>) commitMetadata -> {
final List<DataSegment> dataSegments = new ArrayList<>();
+ log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]",
+ rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get()
+ );
+
log.debug(
"Building and pushing segments: %s",
theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", "))
@@ -723,6 +775,8 @@ public class AppenderatorImpl implements Appenderator
}
}
+ log.info("Push complete...");
+
return new SegmentsAndCommitMetadata(dataSegments, commitMetadata);
},
pushExecutor
@@ -813,6 +867,34 @@ public class AppenderatorImpl implements Appenderator
Closer closer = Closer.create();
try {
for (FireHydrant fireHydrant : sink) {
+
+ // if batch, swap/persist did not memory map the incremental index, we need it mapped now:
+ if (!isRealTime()) {
+
+ // sanity
+ Pair<File, SegmentId> persistedMetadata = persistedHydrantMetadata.get(fireHydrant);
+ if (persistedMetadata == null) {
+ throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant);
+ }
+
+ File persistedFile = persistedMetadata.lhs;
+ SegmentId persistedSegmentId = persistedMetadata.rhs;
+
+ // sanity:
+ if (persistedFile == null) {
+ throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant);
+ } else if (persistedSegmentId == null) {
+ throw new ISE(
+ "Persisted segmentId for batch hydrant in file [%s] is null!",
+ persistedFile.getPath()
+ );
+ }
+ fireHydrant.swapSegment(new QueryableIndexSegment(
+ indexIO.loadIndex(persistedFile),
+ persistedSegmentId
+ ));
+ }
+
Pair<ReferenceCountingSegment, Closeable> segmentAndCloseable = fireHydrant.getAndIncrementSegment();
final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex();
log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant);
@@ -860,6 +942,15 @@ public class AppenderatorImpl implements Appenderator
5
);
+ if (!isRealTime()) {
+ // Drop the queryable indexes behind the hydrants... they are not needed anymore and their
+ // mapped file references
+ // can generate OOMs during merge if enough of them are held back...
+ for (FireHydrant fireHydrant : sink) {
+ fireHydrant.swapSegment(null);
+ }
+ }
+
final long pushFinishTime = System.nanoTime();
objectMapper.writeValue(descriptorFile, segment);
@@ -986,6 +1077,13 @@ public class AppenderatorImpl implements Appenderator
}
}
+ @Override
+ public boolean isRealTime()
+ {
+ return isRealTime;
+ }
+
+
private void lockBasePersistDirectory()
{
if (basePersistDirLock == null) {
@@ -1303,6 +1401,8 @@ public class AppenderatorImpl implements Appenderator
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
hydrant.swapSegment(null);
+ // remove hydrant from persisted metadata:
+ persistedHydrantMetadata.remove(hydrant);
}
if (removeOnDiskData) {
@@ -1417,9 +1517,15 @@ public class AppenderatorImpl implements Appenderator
numRows
);
- indexToPersist.swapSegment(
- new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId())
- );
+ // Map only when this appenderator is being driven by a real time task:
+ Segment segmentToSwap = null;
+ if (isRealTime()) {
+ segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId());
+ } else {
+ // remember file path & segment id to rebuild the queryable index for merge:
+ persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId()));
+ }
+ indexToPersist.swapSegment(segmentToSwap);
return numRows;
}
@@ -1457,10 +1563,15 @@ public class AppenderatorImpl implements Appenderator
// These calculations are approximated from actual heap dumps.
// Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment,
// Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.)
- return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
- (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
- (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
- ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+ int total;
+ total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT;
+ if (isRealTime()) {
+ // for real time add references to byte memory mapped references..
+ total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
+ (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
+ ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
+ }
+ return total;
}
private int calculateSinkMemoryInUsed(Sink sink)
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index c59e4e0..ff8799d 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -89,7 +89,8 @@ public class Appenderators
indexMerger,
cache,
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ true
);
}
@@ -103,7 +104,8 @@ public class Appenderators
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean batchMemoryMappedIndex
)
{
return new AppenderatorImpl(
@@ -119,7 +121,8 @@ public class Appenderators
indexMerger,
null,
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code
);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 76c64d2..c5b22cf 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -98,7 +98,8 @@ public interface AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean batchMemoryMappedIndex
);
/**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
index 7a0f1dc..8abd0c2 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java
@@ -73,7 +73,8 @@ public class DefaultOfflineAppenderatorFactory implements AppenderatorFactory
false,
config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE,
0
- )
+ ),
+ false
);
}
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 87de244..f1e2f3c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -90,7 +90,8 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean batchMemoryMappedIndex
)
{
throw new UOE(ERROR_MSG);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 7fa4f4c..88a4f57 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -122,7 +122,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean batchMemoryMappedIndex
)
{
// CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
@@ -139,7 +140,8 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
indexIO,
indexMerger,
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ batchMemoryMappedIndex
);
return batchAppenderator;
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 3697098..3780c37 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -189,7 +189,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
wrapIndexMerger(indexMerger),
cache,
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ true
);
datasourceBundle.addAppenderator(taskId, appenderator);
@@ -208,7 +209,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
IndexIO indexIO,
IndexMerger indexMerger,
RowIngestionMeters rowIngestionMeters,
- ParseExceptionHandler parseExceptionHandler
+ ParseExceptionHandler parseExceptionHandler,
+ boolean batchMemoryMappedIndex
)
{
synchronized (this) {
@@ -227,7 +229,8 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
indexIO,
wrapIndexMerger(indexMerger),
rowIngestionMeters,
- parseExceptionHandler
+ parseExceptionHandler,
+ batchMemoryMappedIndex
);
datasourceBundle.addAppenderator(taskId, appenderator);
return appenderator;
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
index 4f28842..464141a 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java
@@ -210,4 +210,11 @@ public class FireHydrantTest extends InitializedNullHandlingTest
Function.identity()
);
}
+
+ @Test
+ public void testToStringWhenSwappedWithNull()
+ {
+ hydrant.swapSegment(null);
+ hydrant.toString();
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4f9cd3c..408aa97 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -545,5 +545,11 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport
{
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean isRealTime()
+ {
+ return true;
+ }
+
}
}
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
index f7c85b2..728e980 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManagerTest.java
@@ -84,7 +84,8 @@ public class UnifiedIndexerAppenderatorsManagerTest
TestHelper.getTestIndexIO(),
TestHelper.getTestIndexMergerV9(OnHeapMemorySegmentWriteOutMediumFactory.instance()),
new NoopRowIngestionMeters(),
- new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0)
+ new ParseExceptionHandler(new NoopRowIngestionMeters(), false, 0, 0),
+ false
);
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org