You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/08/01 02:22:53 UTC

[incubator-druid] branch master updated: Simple memory allocation for CliIndexer tasks (#8201)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 41893d4  Simple memory allocation for CliIndexer tasks (#8201)
41893d4 is described below

commit 41893d46470b6dc0730f57c3176d28946b4fa662
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Wed Jul 31 19:22:41 2019 -0700

    Simple memory allocation for CliIndexer tasks (#8201)
    
    * Simple memory allocation for CliIndexer
    
    * PR comments
    
    * Checkstyle
---
 .../indexing/overlord/ThreadingTaskRunner.java     |   2 +
 .../common/task/TestAppenderatorsManager.java      |   6 +
 .../druid/indexing/worker/config/WorkerConfig.java |   8 +
 .../appenderator/AppenderatorsManager.java         |   5 +
 .../DummyForInjectionAppenderatorsManager.java     |   6 +
 .../appenderator/PeonAppenderatorsManager.java     |   6 +
 .../UnifiedIndexerAppenderatorsManager.java        | 336 ++++++++++++++++++++-
 7 files changed, 363 insertions(+), 6 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index c0bcaf4..3af2ebb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -409,6 +409,8 @@ public class ThreadingTaskRunner
     } else {
       LOGGER.warn("Ran out of time, not waiting for executor to finish!");
     }
+
+    appenderatorsManager.shutdown();
   }
 
   @Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 28b15ea..9018b53 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -144,4 +144,10 @@ public class TestAppenderatorsManager implements AppenderatorsManager
   {
     return true;
   }
+
+  @Override
+  public void shutdown()
+  {
+
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
similarity index 90%
rename from indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
rename to server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 4bfa6ee..a501fc8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -49,6 +49,9 @@ public class WorkerConfig
   @JsonProperty
   private Period intermediaryPartitionTimeout = new Period("P1D");
 
+  @JsonProperty
+  private final long globalIngestionHeapLimitBytes = (long) (Runtime.getRuntime().maxMemory() * 0.6);
+
   public String getIp()
   {
     return ip;
@@ -78,4 +81,9 @@ public class WorkerConfig
   {
     return intermediaryPartitionTimeout;
   }
+
+  public long getGlobalIngestionHeapLimitBytes()
+  {
+    return globalIngestionHeapLimitBytes;
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 06d3f6e..a692419 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -127,4 +127,9 @@ public interface AppenderatorsManager
    * Only Tasks running in Peons (i.e., as separate processes) should make their own individual node announcements.
    */
   boolean shouldTaskMakeNodeAnnouncements();
+
+  /**
+   * Shut down the AppenderatorsManager.
+   */
+  void shutdown();
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index 23b4567..8182b32 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -117,4 +117,10 @@ public class DummyForInjectionAppenderatorsManager implements AppenderatorsManag
   {
     throw new UOE(ERROR_MSG);
   }
+
+  @Override
+  public void shutdown()
+  {
+    throw new UOE(ERROR_MSG);
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 7563b15..e96dfd5 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -167,4 +167,10 @@ public class PeonAppenderatorsManager implements AppenderatorsManager
   {
     return true;
   }
+
+  @Override
+  public void shutdown()
+  {
+    // nothing to shut down
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 4aa3593..55cb51f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -21,62 +21,99 @@ package org.apache.druid.segment.realtime.appenderator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
 import org.apache.druid.client.cache.Cache;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.guice.annotations.Processing;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.IndexableAdapter;
+import org.apache.druid.segment.ProgressIndicator;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.joda.time.Interval;
+import org.joda.time.Period;
 
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 /**
- * Manages Appenderators for the Indexer task execution service, which runs all tasks in a single process.
+ * Manages {@link Appenderator} instances for the CliIndexer task execution service, which runs all tasks in
+ * a single process.
  *
  * This class keeps two maps:
- * - A per-datasource SinkQuerySegmentWalker (with an associated per-datasource timeline)
+ * - A per-datasource {@link SinkQuerySegmentWalker} (with an associated per-datasource timeline)
  * - A map that associates a taskId with the Appenderator created for that task
  *
  * Appenderators created by this class will use the shared per-datasource SinkQuerySegmentWalkers.
  *
  * The per-datasource SinkQuerySegmentWalkers share a common queryExecutorService.
+ *
+ * Each task that requests an Appenderator from this AppenderatorsManager will receive a heap memory limit
+ * equal to {@link WorkerConfig#globalIngestionHeapLimitBytes} evenly divided by {@link WorkerConfig#capacity}.
+ *
+ * The Appenderators created by this class share an executor pool for {@link IndexMerger} persist
+ * and merge operations, with concurrent operations limited to `druid.worker.capacity` divided 2. This limit is imposed
+ * to reduce overall memory usage.
  */
 public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
 {
   private final ConcurrentHashMap<String, SinkQuerySegmentWalker> datasourceSegmentWalkers = new ConcurrentHashMap<>();
 
   private final ExecutorService queryExecutorService;
+  private final WorkerConfig workerConfig;
   private final Cache cache;
   private final CacheConfig cacheConfig;
   private final CachePopulatorStats cachePopulatorStats;
 
+  private ListeningExecutorService mergeExecutor;
+
   @Inject
   public UnifiedIndexerAppenderatorsManager(
       @Processing ExecutorService queryExecutorService,
+      WorkerConfig workerConfig,
       Cache cache,
       CacheConfig cacheConfig,
       CachePopulatorStats cachePopulatorStats
   )
   {
     this.queryExecutorService = queryExecutorService;
+    this.workerConfig = workerConfig;
     this.cache = cache;
     this.cacheConfig = cacheConfig;
     this.cachePopulatorStats = cachePopulatorStats;
+
+    int concurrentMerges = Math.max(1, workerConfig.getCapacity() / 2);
+    this.mergeExecutor = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(concurrentMerges, "unified-indexer-merge-pool-%d")
+    );
   }
 
   @Override
@@ -121,14 +158,14 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
 
     Appenderator appenderator = new AppenderatorImpl(
         schema,
-        config,
+        rewriteAppenderatorConfigMemoryLimits(config),
         metrics,
         dataSegmentPusher,
         objectMapper,
         segmentAnnouncer,
         segmentWalker,
         indexIO,
-        indexMerger,
+        wrapIndexMerger(indexMerger),
         cache
     );
 
@@ -149,12 +186,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
   {
     Appenderator appenderator = Appenderators.createOffline(
         schema,
-        config,
+        rewriteAppenderatorConfigMemoryLimits(config),
         metrics,
         dataSegmentPusher,
         objectMapper,
         indexIO,
-        indexMerger
+        wrapIndexMerger(indexMerger)
     );
     return appenderator;
   }
@@ -196,4 +233,291 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
   {
     return false;
   }
+
+  @Override
+  public void shutdown()
+  {
+    if (mergeExecutor != null) {
+      mergeExecutor.shutdownNow();
+      mergeExecutor = null;
+    }
+  }
+
+  private AppenderatorConfig rewriteAppenderatorConfigMemoryLimits(AppenderatorConfig baseConfig)
+  {
+    long perWorkerLimit = workerConfig.getGlobalIngestionHeapLimitBytes() / workerConfig.getCapacity();
+    return new MemoryParameterOverridingAppenderatorConfig(baseConfig, perWorkerLimit);
+  }
+
+  /**
+   * This is a wrapper around AppenderatorConfig that overrides the {@link AppenderatorConfig#getMaxBytesInMemory()}
+   * and {@link AppenderatorConfig#getMaxRowsInMemory()} parameters.
+   *
+   * Row-based limits are disabled by setting maxRowsInMemory to an essentially unlimited value.
+   * maxBytesInMemory is overridden with the provided value. These overrides replace whatever the user has specified.
+   */
+  private static class MemoryParameterOverridingAppenderatorConfig implements AppenderatorConfig
+  {
+    private final AppenderatorConfig baseConfig;
+    private final long newMaxBytesInMemory;
+
+    public MemoryParameterOverridingAppenderatorConfig(
+        AppenderatorConfig baseConfig,
+        long newMaxBytesInMemory
+    )
+    {
+      this.baseConfig = baseConfig;
+      this.newMaxBytesInMemory = newMaxBytesInMemory;
+    }
+
+    @Override
+    public boolean isReportParseExceptions()
+    {
+      return baseConfig.isReportParseExceptions();
+    }
+
+    @Override
+    public int getMaxRowsInMemory()
+    {
+      return Integer.MAX_VALUE; // unlimited, rely on maxBytesInMemory instead
+    }
+
+    @Override
+    public long getMaxBytesInMemory()
+    {
+      return newMaxBytesInMemory;
+    }
+
+    @Override
+    public int getMaxPendingPersists()
+    {
+      return baseConfig.getMaxPendingPersists();
+    }
+
+    @Nullable
+    @Override
+    public Integer getMaxRowsPerSegment()
+    {
+      return baseConfig.getMaxRowsPerSegment();
+    }
+
+    @Nullable
+    @Override
+    public Long getMaxTotalRows()
+    {
+      return baseConfig.getMaxTotalRows();
+    }
+
+    @Override
+    public Period getIntermediatePersistPeriod()
+    {
+      return baseConfig.getIntermediatePersistPeriod();
+    }
+
+    @Override
+    public IndexSpec getIndexSpec()
+    {
+      return baseConfig.getIndexSpec();
+    }
+
+    @Override
+    public IndexSpec getIndexSpecForIntermediatePersists()
+    {
+      return baseConfig.getIndexSpecForIntermediatePersists();
+    }
+
+    @Override
+    public File getBasePersistDirectory()
+    {
+      return baseConfig.getBasePersistDirectory();
+    }
+
+    @Nullable
+    @Override
+    public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
+    {
+      return baseConfig.getSegmentWriteOutMediumFactory();
+    }
+  }
+
+  private IndexMerger wrapIndexMerger(IndexMerger baseMerger)
+  {
+    return new LimitedPoolIndexMerger(baseMerger, mergeExecutor);
+  }
+
+
+  /**
+   * This wrapper around IndexMerger limits concurrent calls to the merge/persist methods used by
+   * {@link AppenderatorImpl} with a shared executor service. Merge/persist methods that are not used by
+   * AppenderatorImpl will throw an exception if called.
+   */
+  public static class LimitedPoolIndexMerger implements IndexMerger
+  {
+    private static final String ERROR_MSG = "Shouldn't be called";
+
+    private final IndexMerger baseMerger;
+
+    private final ListeningExecutorService mergeExecutor;
+
+    public LimitedPoolIndexMerger(
+        IndexMerger baseMerger,
+        ListeningExecutorService mergeExecutor
+    )
+    {
+      this.baseMerger = baseMerger;
+      this.mergeExecutor = mergeExecutor;
+    }
+
+    @Override
+    public File mergeQueryableIndex(
+        List<QueryableIndex> indexes,
+        boolean rollup,
+        AggregatorFactory[] metricAggs,
+        File outDir,
+        IndexSpec indexSpec,
+        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+    ) throws IOException
+    {
+      ListenableFuture<File> mergeFuture = mergeExecutor.submit(
+          new Callable<File>()
+          {
+            @Override
+            public File call()
+            {
+              try {
+                return baseMerger.mergeQueryableIndex(
+                    indexes,
+                    rollup,
+                    metricAggs,
+                    outDir,
+                    indexSpec,
+                    segmentWriteOutMediumFactory
+                );
+              }
+              catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+              }
+            }
+          }
+      );
+
+      try {
+        return mergeFuture.get();
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public File persist(
+        IncrementalIndex index,
+        Interval dataInterval,
+        File outDir,
+        IndexSpec indexSpec,
+        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+    ) throws IOException
+    {
+      ListenableFuture<File> mergeFuture = mergeExecutor.submit(
+          new Callable<File>()
+          {
+            @Override
+            public File call()
+            {
+              try {
+                return baseMerger.persist(
+                    index,
+                    dataInterval,
+                    outDir,
+                    indexSpec,
+                    segmentWriteOutMediumFactory
+                );
+              }
+              catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+              }
+            }
+          }
+      );
+
+      try {
+        return mergeFuture.get();
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public File merge(
+        List<IndexableAdapter> indexes,
+        boolean rollup,
+        AggregatorFactory[] metricAggs,
+        File outDir,
+        IndexSpec indexSpec
+    ) throws IOException
+    {
+      throw new UOE(ERROR_MSG);
+    }
+
+    @Override
+    public File convert(
+        File inDir,
+        File outDir,
+        IndexSpec indexSpec
+    ) throws IOException
+    {
+      throw new UOE(ERROR_MSG);
+    }
+
+    @Override
+    public File append(
+        List<IndexableAdapter> indexes,
+        AggregatorFactory[] aggregators,
+        File outDir,
+        IndexSpec indexSpec,
+        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+    ) throws IOException
+    {
+      throw new UOE(ERROR_MSG);
+    }
+
+    @Override
+    public File persist(
+        IncrementalIndex index,
+        File outDir,
+        IndexSpec indexSpec,
+        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+    ) throws IOException
+    {
+      throw new UOE(ERROR_MSG);
+    }
+
+    @Override
+    public File persist(
+        IncrementalIndex index,
+        Interval dataInterval,
+        File outDir,
+        IndexSpec indexSpec,
+        ProgressIndicator progress,
+        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+    ) throws IOException
+    {
+      throw new UOE(ERROR_MSG);
+    }
+
+    @Override
+    public File mergeQueryableIndex(
+        List<QueryableIndex> indexes,
+        boolean rollup,
+        AggregatorFactory[] metricAggs,
+        File outDir,
+        IndexSpec indexSpec,
+        ProgressIndicator progress,
+        @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+    ) throws IOException
+    {
+      throw new UOE(ERROR_MSG);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org