You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/08/01 02:22:53 UTC
[incubator-druid] branch master updated: Simple memory allocation
for CliIndexer tasks (#8201)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 41893d4 Simple memory allocation for CliIndexer tasks (#8201)
41893d4 is described below
commit 41893d46470b6dc0730f57c3176d28946b4fa662
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Wed Jul 31 19:22:41 2019 -0700
Simple memory allocation for CliIndexer tasks (#8201)
* Simple memory allocation for CliIndexer
* PR comments
* Checkstyle
---
.../indexing/overlord/ThreadingTaskRunner.java | 2 +
.../common/task/TestAppenderatorsManager.java | 6 +
.../druid/indexing/worker/config/WorkerConfig.java | 8 +
.../appenderator/AppenderatorsManager.java | 5 +
.../DummyForInjectionAppenderatorsManager.java | 6 +
.../appenderator/PeonAppenderatorsManager.java | 6 +
.../UnifiedIndexerAppenderatorsManager.java | 336 ++++++++++++++++++++-
7 files changed, 363 insertions(+), 6 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index c0bcaf4..3af2ebb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -409,6 +409,8 @@ public class ThreadingTaskRunner
} else {
LOGGER.warn("Ran out of time, not waiting for executor to finish!");
}
+
+ appenderatorsManager.shutdown();
}
@Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 28b15ea..9018b53 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -144,4 +144,10 @@ public class TestAppenderatorsManager implements AppenderatorsManager
{
return true;
}
+
+ @Override
+ public void shutdown()
+ {
+
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
similarity index 90%
rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
rename to server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 4bfa6ee..a501fc8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -49,6 +49,9 @@ public class WorkerConfig
@JsonProperty
private Period intermediaryPartitionTimeout = new Period("P1D");
+ @JsonProperty
+ private final long globalIngestionHeapLimitBytes = (long) (Runtime.getRuntime().maxMemory() * 0.6);
+
public String getIp()
{
return ip;
@@ -78,4 +81,9 @@ public class WorkerConfig
{
return intermediaryPartitionTimeout;
}
+
+ public long getGlobalIngestionHeapLimitBytes()
+ {
+ return globalIngestionHeapLimitBytes;
+ }
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 06d3f6e..a692419 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -127,4 +127,9 @@ public interface AppenderatorsManager
* Only Tasks running in Peons (i.e., as separate processes) should make their own individual node announcements.
*/
boolean shouldTaskMakeNodeAnnouncements();
+
+ /**
+ * Shut down the AppenderatorsManager.
+ */
+ void shutdown();
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 23b4567..8182b32 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -117,4 +117,10 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
{
throw new UOE(ERROR_MSG);
}
+
+ @Override
+ public void shutdown()
+ {
+ throw new UOE(ERROR_MSG);
+ }
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 7563b15..e96dfd5 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -167,4 +167,10 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
{
return true;
}
+
+ @Override
+ public void shutdown()
+ {
+ // nothing to shut down
+ }
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 4aa3593..55cb51f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -21,62 +21,99 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.guice.annotations.Processing;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.IndexableAdapter;
+import org.apache.druid.segment.ProgressIndicator;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;
+import org.joda.time.Period;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
/**
- * Manages Appenderators for the Indexer task execution service, which runs all tasks in a single process.
+ * Manages {@link Appenderator} instances for the CliIndexer task execution service, which runs all tasks in
+ * a single process.
*
* This class keeps two maps:
- * - A per-datasource SinkQuerySegmentWalker (with an associated per-datasource timeline)
+ * - A per-datasource {@link SinkQuerySegmentWalker} (with an associated per-datasource timeline)
* - A map that associates a taskId with the Appenderator created for that task
*
* Appenderators created by this class will use the shared per-datasource SinkQuerySegmentWalkers.
*
* The per-datasource SinkQuerySegmentWalkers share a common queryExecutorService.
+ *
+ * Each task that requests an Appenderator from this AppenderatorsManager will receive a heap memory limit
+ * equal to {@link WorkerConfig#globalIngestionHeapLimitBytes} evenly divided by {@link WorkerConfig#capacity}.
+ *
+ * The Appenderators created by this class share an executor pool for {@link IndexMerger} persist
+ * and merge operations, with concurrent operations limited to `druid.worker.capacity` divided 2. This limit is imposed
+ * to reduce overall memory usage.
*/
public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
{
private final ConcurrentHashMap<String, SinkQuerySegmentWalker> datasourceSegmentWalkers = new ConcurrentHashMap<>();
private final ExecutorService queryExecutorService;
+ private final WorkerConfig workerConfig;
private final Cache cache;
private final CacheConfig cacheConfig;
private final CachePopulatorStats cachePopulatorStats;
+ private ListeningExecutorService mergeExecutor;
+
@Inject
public UnifiedIndexerAppenderatorsManager(
@Processing ExecutorService queryExecutorService,
+ WorkerConfig workerConfig,
Cache cache,
CacheConfig cacheConfig,
CachePopulatorStats cachePopulatorStats
)
{
this.queryExecutorService = queryExecutorService;
+ this.workerConfig = workerConfig;
this.cache = cache;
this.cacheConfig = cacheConfig;
this.cachePopulatorStats = cachePopulatorStats;
+
+ int concurrentMerges = Math.max(1, workerConfig.getCapacity() / 2);
+ this.mergeExecutor = MoreExecutors.listeningDecorator(
+ Execs.multiThreaded(concurrentMerges, "unified-indexer-merge-pool-%d")
+ );
}
@Override
@@ -121,14 +158,14 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
Appenderator appenderator = new AppenderatorImpl(
schema,
- config,
+ rewriteAppenderatorConfigMemoryLimits(config),
metrics,
dataSegmentPusher,
objectMapper,
segmentAnnouncer,
segmentWalker,
indexIO,
- indexMerger,
+ wrapIndexMerger(indexMerger),
cache
);
@@ -149,12 +186,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
{
Appenderator appenderator = Appenderators.createOffline(
schema,
- config,
+ rewriteAppenderatorConfigMemoryLimits(config),
metrics,
dataSegmentPusher,
objectMapper,
indexIO,
- indexMerger
+ wrapIndexMerger(indexMerger)
);
return appenderator;
}
@@ -196,4 +233,291 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
{
return false;
}
+
+ @Override
+ public void shutdown()
+ {
+ if (mergeExecutor != null) {
+ mergeExecutor.shutdownNow();
+ mergeExecutor = null;
+ }
+ }
+
+ private AppenderatorConfig rewriteAppenderatorConfigMemoryLimits(AppenderatorConfig baseConfig)
+ {
+ long perWorkerLimit = workerConfig.getGlobalIngestionHeapLimitBytes() / workerConfig.getCapacity();
+ return new MemoryParameterOverridingAppenderatorConfig(baseConfig, perWorkerLimit);
+ }
+
+ /**
+ * This is a wrapper around AppenderatorConfig that overrides the {@link AppenderatorConfig#getMaxBytesInMemory()}
+ * and {@link AppenderatorConfig#getMaxRowsInMemory()} parameters.
+ *
+ * Row-based limits are disabled by setting maxRowsInMemory to an essentially unlimited value.
+ * maxBytesInMemory is overridden with the provided value. These overrides replace whatever the user has specified.
+ */
+ private static class MemoryParameterOverridingAppenderatorConfig implements AppenderatorConfig
+ {
+ private final AppenderatorConfig baseConfig;
+ private final long newMaxBytesInMemory;
+
+ public MemoryParameterOverridingAppenderatorConfig(
+ AppenderatorConfig baseConfig,
+ long newMaxBytesInMemory
+ )
+ {
+ this.baseConfig = baseConfig;
+ this.newMaxBytesInMemory = newMaxBytesInMemory;
+ }
+
+ @Override
+ public boolean isReportParseExceptions()
+ {
+ return baseConfig.isReportParseExceptions();
+ }
+
+ @Override
+ public int getMaxRowsInMemory()
+ {
+ return Integer.MAX_VALUE; // unlimited, rely on maxBytesInMemory instead
+ }
+
+ @Override
+ public long getMaxBytesInMemory()
+ {
+ return newMaxBytesInMemory;
+ }
+
+ @Override
+ public int getMaxPendingPersists()
+ {
+ return baseConfig.getMaxPendingPersists();
+ }
+
+ @Nullable
+ @Override
+ public Integer getMaxRowsPerSegment()
+ {
+ return baseConfig.getMaxRowsPerSegment();
+ }
+
+ @Nullable
+ @Override
+ public Long getMaxTotalRows()
+ {
+ return baseConfig.getMaxTotalRows();
+ }
+
+ @Override
+ public Period getIntermediatePersistPeriod()
+ {
+ return baseConfig.getIntermediatePersistPeriod();
+ }
+
+ @Override
+ public IndexSpec getIndexSpec()
+ {
+ return baseConfig.getIndexSpec();
+ }
+
+ @Override
+ public IndexSpec getIndexSpecForIntermediatePersists()
+ {
+ return baseConfig.getIndexSpecForIntermediatePersists();
+ }
+
+ @Override
+ public File getBasePersistDirectory()
+ {
+ return baseConfig.getBasePersistDirectory();
+ }
+
+ @Nullable
+ @Override
+ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
+ {
+ return baseConfig.getSegmentWriteOutMediumFactory();
+ }
+ }
+
+ private IndexMerger wrapIndexMerger(IndexMerger baseMerger)
+ {
+ return new LimitedPoolIndexMerger(baseMerger, mergeExecutor);
+ }
+
+
+ /**
+ * This wrapper around IndexMerger limits concurrent calls to the merge/persist methods used by
+ * {@link AppenderatorImpl} with a shared executor service. Merge/persist methods that are not used by
+ * AppenderatorImpl will throw an exception if called.
+ */
+ public static class LimitedPoolIndexMerger implements IndexMerger
+ {
+ private static final String ERROR_MSG = "Shouldn't be called";
+
+ private final IndexMerger baseMerger;
+
+ private final ListeningExecutorService mergeExecutor;
+
+ public LimitedPoolIndexMerger(
+ IndexMerger baseMerger,
+ ListeningExecutorService mergeExecutor
+ )
+ {
+ this.baseMerger = baseMerger;
+ this.mergeExecutor = mergeExecutor;
+ }
+
+ @Override
+ public File mergeQueryableIndex(
+ List<QueryableIndex> indexes,
+ boolean rollup,
+ AggregatorFactory[] metricAggs,
+ File outDir,
+ IndexSpec indexSpec,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+ ListenableFuture<File> mergeFuture = mergeExecutor.submit(
+ new Callable<File>()
+ {
+ @Override
+ public File call()
+ {
+ try {
+ return baseMerger.mergeQueryableIndex(
+ indexes,
+ rollup,
+ metricAggs,
+ outDir,
+ indexSpec,
+ segmentWriteOutMediumFactory
+ );
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+ );
+
+ try {
+ return mergeFuture.get();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public File persist(
+ IncrementalIndex index,
+ Interval dataInterval,
+ File outDir,
+ IndexSpec indexSpec,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+ ListenableFuture<File> mergeFuture = mergeExecutor.submit(
+ new Callable<File>()
+ {
+ @Override
+ public File call()
+ {
+ try {
+ return baseMerger.persist(
+ index,
+ dataInterval,
+ outDir,
+ indexSpec,
+ segmentWriteOutMediumFactory
+ );
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+ );
+
+ try {
+ return mergeFuture.get();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public File merge(
+ List<IndexableAdapter> indexes,
+ boolean rollup,
+ AggregatorFactory[] metricAggs,
+ File outDir,
+ IndexSpec indexSpec
+ ) throws IOException
+ {
+ throw new UOE(ERROR_MSG);
+ }
+
+ @Override
+ public File convert(
+ File inDir,
+ File outDir,
+ IndexSpec indexSpec
+ ) throws IOException
+ {
+ throw new UOE(ERROR_MSG);
+ }
+
+ @Override
+ public File append(
+ List<IndexableAdapter> indexes,
+ AggregatorFactory[] aggregators,
+ File outDir,
+ IndexSpec indexSpec,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+ throw new UOE(ERROR_MSG);
+ }
+
+ @Override
+ public File persist(
+ IncrementalIndex index,
+ File outDir,
+ IndexSpec indexSpec,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+ throw new UOE(ERROR_MSG);
+ }
+
+ @Override
+ public File persist(
+ IncrementalIndex index,
+ Interval dataInterval,
+ File outDir,
+ IndexSpec indexSpec,
+ ProgressIndicator progress,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+ throw new UOE(ERROR_MSG);
+ }
+
+ @Override
+ public File mergeQueryableIndex(
+ List<QueryableIndex> indexes,
+ boolean rollup,
+ AggregatorFactory[] metricAggs,
+ File outDir,
+ IndexSpec indexSpec,
+ ProgressIndicator progress,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+ ) throws IOException
+ {
+ throw new UOE(ERROR_MSG);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org