You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/06/14 15:59:43 UTC

[incubator-druid] branch master updated: Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bee6ad  Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764)
3bee6ad is described below

commit 3bee6adcf7338b501687f11ea866608e873c5b0a
Author: Sashidhar Thallam <t....@gmail.com>
AuthorDate: Fri Jun 14 21:29:36 2019 +0530

    Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764)
    
    * https://github.com/apache/incubator-druid/issues/7316 Use Map.putIfAbsent() instead of containsKey() + put()
    
    * fixing indentation
    
    * Using map.computeIfAbsent() instead of map.putIfAbsent() where appropriate
    
    * fixing checkstyle
    
    * Changing the recommendation text
    
    * Reverting auto changes made by IDE
    
    * Implementing recommendation: A ConcurrentHashMap on which computeIfAbsent() is called should be assigned into variables of ConcurrentHashMap type, not ConcurrentMap
    
    * Removing unused import
---
 .idea/inspectionProfiles/Druid.xml                 |   7 +-
 .../druid/storage/s3/S3DataSegmentMoverTest.java   |   4 +-
 .../indexer/DetermineHashedPartitionsJob.java      |   4 +-
 .../ActionBasedUsedSegmentChecker.java             |   5 +-
 .../druid/indexing/common/task/IndexTask.java      |   8 +-
 .../firehose/IngestSegmentFirehoseFactory.java     | 149 +++---
 .../druid/indexing/overlord/ForkingTaskRunner.java | 548 ++++++++++-----------
 .../metadata/SQLMetadataSupervisorManager.java     |   4 +-
 .../druid/server/http/DataSourcesResource.java     |   4 +-
 .../druid/client/CachingClusteredClientTest.java   |   4 +-
 .../appenderator/StreamAppenderatorDriverTest.java |   4 +-
 .../java/org/apache/druid/sql/SqlLifecycle.java    |   4 +-
 .../util/SpecificSegmentsQuerySegmentWalker.java   |   4 +-
 13 files changed, 365 insertions(+), 384 deletions(-)

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 9770c11..ed890c8 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -306,6 +306,11 @@
         <constraint name="x" maxCount="2147483647" within="" contains="" />
         <constraint name="ImmutableMap" regexp="Immutable.*" within="" contains="" />
       </searchConfiguration>
+      <searchConfiguration name="Use map.putIfAbsent(k,v) or map.computeIfAbsent(k,v) where appropriate instead of containsKey() + put(). If computing v is expensive or has side effects use map.computeIfAbsent() instead" created="1558868694225" text="if (!$m$.containsKey($k$)) {&#10;    $m$.put($k$, $v$);&#10;}" recursive="false" caseInsensitive="true" type="JAVA">
+        <constraint name="m" within="" contains="" />
+        <constraint name="k" within="" contains="" />
+        <constraint name="v" within="" contains="" />
+      </searchConfiguration>
     </inspection_tool>
     <inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
@@ -400,4 +405,4 @@
       <option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
     </inspection_tool>
   </profile>
-</component>
\ No newline at end of file
+</component>
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
index ec30aa9..34496d9 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
@@ -261,9 +261,7 @@ public class S3DataSegmentMoverTest
     @Override
     public PutObjectResult putObject(String bucketName, String key, File file)
     {
-      if (!storage.containsKey(bucketName)) {
-        storage.put(bucketName, new HashSet<>());
-      }
+      storage.putIfAbsent(bucketName, new HashSet<>());
       storage.get(bucketName).add(key);
       return new PutObjectResult();
     }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index c83bc08..17f5172 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -307,9 +307,7 @@ public class DetermineHashedPartitionsJob implements Jobby
                          .getSegmentGranularity()
                          .bucket(DateTimes.utc(inputRow.getTimestampFromEpoch()));
 
-        if (!hyperLogLogs.containsKey(interval)) {
-          hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
-        }
+        hyperLogLogs.computeIfAbsent(interval, intv -> HyperLogLogCollector.makeLatestCollector());
       } else {
         final Optional<Interval> maybeInterval = config.getGranularitySpec()
                                                        .bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()));
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
index a1eb90f..96ce6ae 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
@@ -50,9 +50,8 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker
     // Group by dataSource
     final Map<String, Set<SegmentIdWithShardSpec>> identifiersByDataSource = new TreeMap<>();
     for (SegmentIdWithShardSpec identifier : identifiers) {
-      if (!identifiersByDataSource.containsKey(identifier.getDataSource())) {
-        identifiersByDataSource.put(identifier.getDataSource(), new HashSet<>());
-      }
+      identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k -> new HashSet<>());
+
       identifiersByDataSource.get(identifier.getDataSource()).add(identifier);
     }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index b7e5123..458c621 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -768,9 +768,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
           }
 
           if (determineNumPartitions) {
-            if (!hllCollectors.containsKey(interval)) {
-              hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
-            }
+            hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
 
             List<Object> groupKey = Rows.toGroupKey(
                 queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
@@ -781,9 +779,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
           } else {
             // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
             // for the interval and don't instantiate a HLL collector
-            if (!hllCollectors.containsKey(interval)) {
-              hllCollectors.put(interval, Optional.absent());
-            }
+            hllCollectors.putIfAbsent(interval, Optional.absent());
           }
           determinePartitionsMeters.incrementProcessed();
         }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 4d1e0bf..2caf91d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -204,87 +204,87 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
         segmentIds
     );
 
-    try {
-      final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
-
-      // Download all segments locally.
-      // Note: this requires enough local storage space to fit all of the segments, even though
-      // IngestSegmentFirehose iterates over the segments in series. We may want to change this
-      // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
-      final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
-      Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
-      for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
-        for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
-          final DataSegment segment = chunk.getObject();
-          if (!segmentFileMap.containsKey(segment)) {
-            segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
+    final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
+
+    // Download all segments locally.
+    // Note: this requires enough local storage space to fit all of the segments, even though
+    // IngestSegmentFirehose iterates over the segments in series. We may want to change this
+    // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
+    final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+    Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
+    for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
+      for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
+        final DataSegment segment = chunk.getObject();
+
+        segmentFileMap.computeIfAbsent(segment, k -> {
+          try {
+            return segmentLoader.getSegmentFiles(segment);
           }
-        }
-      }
+          catch (SegmentLoadingException e) {
+            throw new RuntimeException(e);
+          }
+        });
 
-      final List<String> dims;
-      if (dimensions != null) {
-        dims = dimensions;
-      } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
-        dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
-      } else {
-        dims = getUniqueDimensions(
-            timeLineSegments,
-            inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
-        );
       }
+    }
 
-      final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
+    final List<String> dims;
+    if (dimensions != null) {
+      dims = dimensions;
+    } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
+      dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
+    } else {
+      dims = getUniqueDimensions(
+        timeLineSegments,
+        inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
+      );
+    }
 
-      final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
-          Iterables.concat(
-              Iterables.transform(
-                  timeLineSegments,
-                  new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>()
-                  {
+    final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
+
+    final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
+        Iterables.concat(
+        Iterables.transform(
+          timeLineSegments,
+          new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>() {
+            @Override
+            public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
+            {
+              return
+                Iterables.transform(
+                  holder.getObject(),
+                  new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>() {
                     @Override
-                    public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
+                    public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
                     {
-                      return
-                          Iterables.transform(
-                              holder.getObject(),
-                              new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>()
-                              {
-                                @Override
-                                public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
-                                {
-                                  final DataSegment segment = input.getObject();
-                                  try {
-                                    return new WindowedStorageAdapter(
-                                        new QueryableIndexStorageAdapter(
-                                            indexIO.loadIndex(
-                                                Preconditions.checkNotNull(
-                                                    segmentFileMap.get(segment),
-                                                    "File for segment %s", segment.getId()
-                                                )
-                                            )
-                                        ),
-                                        holder.getInterval()
-                                    );
-                                  }
-                                  catch (IOException e) {
-                                    throw new RuntimeException(e);
-                                  }
-                                }
-                              }
-                          );
+                      final DataSegment segment = input.getObject();
+                      try {
+                        return new WindowedStorageAdapter(
+                          new QueryableIndexStorageAdapter(
+                            indexIO.loadIndex(
+                              Preconditions.checkNotNull(
+                                segmentFileMap.get(segment),
+                                "File for segment %s", segment.getId()
+                              )
+                            )
+                          ),
+                          holder.getInterval()
+                        );
+                      }
+                      catch (IOException e) {
+                        throw new RuntimeException(e);
+                      }
                     }
                   }
-              )
-          )
-      );
+                );
+            }
+          }
+        )
+      )
+    );
 
-      final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
-      return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
-    }
-    catch (SegmentLoadingException e) {
-      throw new RuntimeException(e);
-    }
+    final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
+    return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
   }
 
   private long jitter(long input)
@@ -508,13 +508,14 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
     // segments to olders.
 
     // timelineSegments are sorted in order of interval
-    int index = 0;
+    int[] index = {0};
     for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
       for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
         for (String metric : chunk.getObject().getMetrics()) {
-          if (!uniqueMetrics.containsKey(metric)) {
-            uniqueMetrics.put(metric, index++);
+          uniqueMetrics.computeIfAbsent(metric, k -> {
+            return index[0]++;
           }
+          );
         }
       }
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05f2f52..116747b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -84,7 +84,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -108,7 +107,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
   private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
 
   /** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
-  private final ConcurrentMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
 
   private volatile boolean stopping = false;
 
@@ -214,309 +213,306 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
   public ListenableFuture<TaskStatus> run(final Task task)
   {
     synchronized (tasks) {
-      if (!tasks.containsKey(task.getId())) {
-        tasks.put(
-            task.getId(),
-            new ForkingTaskRunnerWorkItem(
-                task,
-                exec.submit(
-                    new Callable<TaskStatus>()
-                    {
-                      @Override
-                      public TaskStatus call()
-                      {
-                        final String attemptUUID = UUID.randomUUID().toString();
-                        final File taskDir = taskConfig.getTaskDir(task.getId());
-                        final File attemptDir = new File(taskDir, attemptUUID);
-
-                        final ProcessHolder processHolder;
-                        final String childHost = node.getHost();
-                        int childPort = -1;
-                        int tlsChildPort = -1;
-
-                        if (node.isEnablePlaintextPort()) {
-                          childPort = portFinder.findUnusedPort();
+      tasks.computeIfAbsent(
+          task.getId(), k ->
+          new ForkingTaskRunnerWorkItem(
+            task,
+            exec.submit(
+              new Callable<TaskStatus>() {
+                @Override
+                public TaskStatus call()
+                {
+                  final String attemptUUID = UUID.randomUUID().toString();
+                  final File taskDir = taskConfig.getTaskDir(task.getId());
+                  final File attemptDir = new File(taskDir, attemptUUID);
+
+                  final ProcessHolder processHolder;
+                  final String childHost = node.getHost();
+                  int childPort = -1;
+                  int tlsChildPort = -1;
+
+                  if (node.isEnablePlaintextPort()) {
+                    childPort = portFinder.findUnusedPort();
+                  }
+
+                  if (node.isEnableTlsPort()) {
+                    tlsChildPort = portFinder.findUnusedPort();
+                  }
+
+                  final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
+
+                  try {
+                    final Closer closer = Closer.create();
+                    try {
+                      if (!attemptDir.mkdirs()) {
+                        throw new IOE("Could not create directories: %s", attemptDir);
+                      }
+
+                      final File taskFile = new File(taskDir, "task.json");
+                      final File statusFile = new File(attemptDir, "status.json");
+                      final File logFile = new File(taskDir, "log");
+                      final File reportsFile = new File(attemptDir, "report.json");
+
+                      // time to adjust process holders
+                      synchronized (tasks) {
+                        final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
+
+                        if (taskWorkItem.shutdown) {
+                          throw new IllegalStateException("Task has been shut down!");
                         }
 
-                        if (node.isEnableTlsPort()) {
-                          tlsChildPort = portFinder.findUnusedPort();
+                        if (taskWorkItem == null) {
+                          log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
+                          throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
                         }
 
-                        final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
+                        if (taskWorkItem.processHolder != null) {
+                          log.makeAlert("WTF?! TaskInfo already has a processHolder")
+                            .addData("task", task.getId())
+                            .emit();
+                          throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
+                        }
 
-                        try {
-                          final Closer closer = Closer.create();
-                          try {
-                            if (!attemptDir.mkdirs()) {
-                              throw new IOE("Could not create directories: %s", attemptDir);
-                            }
+                        final List<String> command = new ArrayList<>();
+                        final String taskClasspath;
+                        if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
+                          taskClasspath = Joiner.on(File.pathSeparator).join(
+                            task.getClasspathPrefix(),
+                            config.getClasspath()
+                          );
+                        } else {
+                          taskClasspath = config.getClasspath();
+                        }
 
-                            final File taskFile = new File(taskDir, "task.json");
-                            final File statusFile = new File(attemptDir, "status.json");
-                            final File logFile = new File(taskDir, "log");
-                            final File reportsFile = new File(attemptDir, "report.json");
-
-                            // time to adjust process holders
-                            synchronized (tasks) {
-                              final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
-
-                              if (taskWorkItem.shutdown) {
-                                throw new IllegalStateException("Task has been shut down!");
-                              }
-
-                              if (taskWorkItem == null) {
-                                log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
-                                throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
-                              }
-
-                              if (taskWorkItem.processHolder != null) {
-                                log.makeAlert("WTF?! TaskInfo already has a processHolder")
-                                   .addData("task", task.getId())
-                                   .emit();
-                                throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
-                              }
-
-                              final List<String> command = new ArrayList<>();
-                              final String taskClasspath;
-                              if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
-                                taskClasspath = Joiner.on(File.pathSeparator).join(
-                                    task.getClasspathPrefix(),
-                                    config.getClasspath()
-                                );
-                              } else {
-                                taskClasspath = config.getClasspath();
-                              }
-
-                              command.add(config.getJavaCommand());
-                              command.add("-cp");
-                              command.add(taskClasspath);
-
-                              Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
-                              Iterables.addAll(command, config.getJavaOptsArray());
-
-                              // Override task specific javaOpts
-                              Object taskJavaOpts = task.getContextValue(
-                                  ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
-                              );
-                              if (taskJavaOpts != null) {
-                                Iterables.addAll(
-                                    command,
-                                    new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
-                                );
-                              }
-
-                              for (String propName : props.stringPropertyNames()) {
-                                for (String allowedPrefix : config.getAllowedPrefixes()) {
-                                  // See https://github.com/apache/incubator-druid/issues/1841
-                                  if (propName.startsWith(allowedPrefix)
-                                      && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
-                                      && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
-                                      ) {
-                                    command.add(
-                                        StringUtils.format(
-                                            "-D%s=%s",
-                                            propName,
-                                            props.getProperty(propName)
-                                        )
-                                    );
-                                  }
-                                }
-                              }
-
-                              // Override child JVM specific properties
-                              for (String propName : props.stringPropertyNames()) {
-                                if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
-                                  command.add(
-                                      StringUtils.format(
-                                          "-D%s=%s",
-                                          propName.substring(CHILD_PROPERTY_PREFIX.length()),
-                                          props.getProperty(propName)
-                                      )
-                                  );
-                                }
-                              }
-
-                              // Override task specific properties
-                              final Map<String, Object> context = task.getContext();
-                              if (context != null) {
-                                for (String propName : context.keySet()) {
-                                  if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
-                                    command.add(
-                                        StringUtils.format(
-                                            "-D%s=%s",
-                                            propName.substring(CHILD_PROPERTY_PREFIX.length()),
-                                            task.getContextValue(propName)
-                                        )
-                                    );
-                                  }
-                                }
-                              }
-
-                              // Add dataSource, taskId and taskType for metrics or logging
-                              command.add(
-                                  StringUtils.format(
-                                      "-D%s%s=%s",
-                                      MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                                      DruidMetrics.DATASOURCE,
-                                      task.getDataSource()
-                                  )
-                              );
+                        command.add(config.getJavaCommand());
+                        command.add("-cp");
+                        command.add(taskClasspath);
+
+                        Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
+                        Iterables.addAll(command, config.getJavaOptsArray());
+
+                        // Override task specific javaOpts
+                        Object taskJavaOpts = task.getContextValue(
+                            ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+                        );
+                        if (taskJavaOpts != null) {
+                          Iterables.addAll(
+                              command,
+                              new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
+                          );
+                        }
+
+                        for (String propName : props.stringPropertyNames()) {
+                          for (String allowedPrefix : config.getAllowedPrefixes()) {
+                            // See https://github.com/apache/incubator-druid/issues/1841
+                            if (propName.startsWith(allowedPrefix)
+                                && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
+                                && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
+                            ) {
                               command.add(
                                   StringUtils.format(
-                                      "-D%s%s=%s",
-                                      MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                                      DruidMetrics.TASK_ID,
-                                      task.getId()
-                                  )
+                                  "-D%s=%s",
+                                  propName,
+                                  props.getProperty(propName)
+                                )
                               );
+                            }
+                          }
+                        }
+
+                        // Override child JVM specific properties
+                        for (String propName : props.stringPropertyNames()) {
+                          if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
+                            command.add(
+                                StringUtils.format(
+                                "-D%s=%s",
+                                propName.substring(CHILD_PROPERTY_PREFIX.length()),
+                                props.getProperty(propName)
+                              )
+                            );
+                          }
+                        }
+
+                        // Override task specific properties
+                        final Map<String, Object> context = task.getContext();
+                        if (context != null) {
+                          for (String propName : context.keySet()) {
+                            if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
                               command.add(
                                   StringUtils.format(
-                                      "-D%s%s=%s",
-                                      MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                                      DruidMetrics.TASK_TYPE,
-                                      task.getType()
-                                  )
+                                  "-D%s=%s",
+                                  propName.substring(CHILD_PROPERTY_PREFIX.length()),
+                                  task.getContextValue(propName)
+                                )
                               );
+                            }
+                          }
+                        }
 
-                              command.add(StringUtils.format("-Ddruid.host=%s", childHost));
-                              command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
-                              command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
-                              /**
-                               * These are not enabled per default to allow the user to either set or not set them
-                               * Users are highly suggested to be set in druid.indexer.runner.javaOpts
-                               * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
-                               * for more information
-                               command.add("-XX:+UseThreadPriorities");
-                               command.add("-XX:ThreadPriorityPolicy=42");
-                               */
-
-                              command.add("org.apache.druid.cli.Main");
-                              command.add("internal");
-                              command.add("peon");
-                              command.add(taskFile.toString());
-                              command.add(statusFile.toString());
-                              command.add(reportsFile.toString());
-                              String nodeType = task.getNodeType();
-                              if (nodeType != null) {
-                                command.add("--nodeType");
-                                command.add(nodeType);
-                              }
-
-                              if (!taskFile.exists()) {
-                                jsonMapper.writeValue(taskFile, task);
-                              }
-
-                              log.info("Running command: %s", Joiner.on(" ").join(command));
-                              taskWorkItem.processHolder = new ProcessHolder(
-                                  new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
-                                  logFile,
-                                  taskLocation.getHost(),
-                                  taskLocation.getPort(),
-                                  taskLocation.getTlsPort()
-                              );
+                        // Add dataSource, taskId and taskType for metrics or logging
+                        command.add(
+                            StringUtils.format(
+                            "-D%s%s=%s",
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
+                            DruidMetrics.DATASOURCE,
+                            task.getDataSource()
+                          )
+                        );
+                        command.add(
+                            StringUtils.format(
+                            "-D%s%s=%s",
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
+                            DruidMetrics.TASK_ID,
+                            task.getId()
+                          )
+                        );
+                        command.add(
+                            StringUtils.format(
+                            "-D%s%s=%s",
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
+                            DruidMetrics.TASK_TYPE,
+                            task.getType()
+                          )
+                        );
+
+                        command.add(StringUtils.format("-Ddruid.host=%s", childHost));
+                        command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
+                        command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
+                        /**
+                         * These are not enabled per default to allow the user to either set or not set them
+                         * Users are highly suggested to be set in druid.indexer.runner.javaOpts
+                         * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
+                         * for more information
+                         command.add("-XX:+UseThreadPriorities");
+                         command.add("-XX:ThreadPriorityPolicy=42");
+                         */
+
+                        command.add("org.apache.druid.cli.Main");
+                        command.add("internal");
+                        command.add("peon");
+                        command.add(taskFile.toString());
+                        command.add(statusFile.toString());
+                        command.add(reportsFile.toString());
+                        String nodeType = task.getNodeType();
+                        if (nodeType != null) {
+                          command.add("--nodeType");
+                          command.add(nodeType);
+                        }
 
-                              processHolder = taskWorkItem.processHolder;
-                              processHolder.registerWithCloser(closer);
-                            }
+                        if (!taskFile.exists()) {
+                          jsonMapper.writeValue(taskFile, task);
+                        }
 
-                            TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
-                            TaskRunnerUtils.notifyStatusChanged(
-                                listeners,
-                                task.getId(),
-                                TaskStatus.running(task.getId())
-                            );
+                        log.info("Running command: %s", Joiner.on(" ").join(command));
+                        taskWorkItem.processHolder = new ProcessHolder(
+                          new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
+                          logFile,
+                          taskLocation.getHost(),
+                          taskLocation.getPort(),
+                          taskLocation.getTlsPort()
+                        );
+
+                        processHolder = taskWorkItem.processHolder;
+                        processHolder.registerWithCloser(closer);
+                      }
 
-                            log.info("Logging task %s output to: %s", task.getId(), logFile);
-                            boolean runFailed = true;
+                      TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
+                      TaskRunnerUtils.notifyStatusChanged(
+                          listeners,
+                          task.getId(),
+                          TaskStatus.running(task.getId())
+                      );
 
-                            final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
+                      log.info("Logging task %s output to: %s", task.getId(), logFile);
+                      boolean runFailed = true;
 
-                            // This will block for a while. So we append the thread information with more details
-                            final String priorThreadName = Thread.currentThread().getName();
-                            Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
+                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
 
-                            try (final OutputStream toLogfile = logSink.openStream()) {
-                              ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                              final int statusCode = processHolder.process.waitFor();
-                              log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                              if (statusCode == 0) {
-                                runFailed = false;
-                              }
-                            }
-                            finally {
-                              Thread.currentThread().setName(priorThreadName);
-                              // Upload task logs
-                              taskLogPusher.pushTaskLog(task.getId(), logFile);
-                              if (reportsFile.exists()) {
-                                taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                              }
-                            }
+                      // This will block for a while. So we append the thread information with more details
+                      final String priorThreadName = Thread.currentThread().getName();
+                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
 
-                            TaskStatus status;
-                            if (!runFailed) {
-                              // Process exited successfully
-                              status = jsonMapper.readValue(statusFile, TaskStatus.class);
-                            } else {
-                              // Process exited unsuccessfully
-                              status = TaskStatus.failure(task.getId());
-                            }
+                      try (final OutputStream toLogfile = logSink.openStream()) {
+                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
+                        final int statusCode = processHolder.process.waitFor();
+                        log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
+                        if (statusCode == 0) {
+                          runFailed = false;
+                        }
+                      }
+                      finally {
+                        Thread.currentThread().setName(priorThreadName);
+                        // Upload task logs
+                        taskLogPusher.pushTaskLog(task.getId(), logFile);
+                        if (reportsFile.exists()) {
+                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
+                        }
+                      }
 
-                            TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
-                            return status;
-                          }
-                          catch (Throwable t) {
-                            throw closer.rethrow(t);
-                          }
-                          finally {
-                            closer.close();
-                          }
+                      TaskStatus status;
+                      if (!runFailed) {
+                        // Process exited successfully
+                        status = jsonMapper.readValue(statusFile, TaskStatus.class);
+                      } else {
+                        // Process exited unsuccessfully
+                        status = TaskStatus.failure(task.getId());
+                      }
+
+                      TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
+                      return status;
+                    }
+                    catch (Throwable t) {
+                      throw closer.rethrow(t);
+                    }
+                    finally {
+                      closer.close();
+                    }
+                  }
+                  catch (Throwable t) {
+                    log.info(t, "Exception caught during execution");
+                    throw new RuntimeException(t);
+                  }
+                  finally {
+                    try {
+                      synchronized (tasks) {
+                        final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
+                        if (taskWorkItem != null && taskWorkItem.processHolder != null) {
+                          taskWorkItem.processHolder.process.destroy();
                         }
-                        catch (Throwable t) {
-                          log.info(t, "Exception caught during execution");
-                          throw new RuntimeException(t);
+                        if (!stopping) {
+                          saveRunningTasks();
                         }
-                        finally {
-                          try {
-                            synchronized (tasks) {
-                              final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
-                              if (taskWorkItem != null && taskWorkItem.processHolder != null) {
-                                taskWorkItem.processHolder.process.destroy();
-                              }
-                              if (!stopping) {
-                                saveRunningTasks();
-                              }
-                            }
+                      }
 
-                            if (node.isEnablePlaintextPort()) {
-                              portFinder.markPortUnused(childPort);
-                            }
-                            if (node.isEnableTlsPort()) {
-                              portFinder.markPortUnused(tlsChildPort);
-                            }
+                      if (node.isEnablePlaintextPort()) {
+                        portFinder.markPortUnused(childPort);
+                      }
+                      if (node.isEnableTlsPort()) {
+                        portFinder.markPortUnused(tlsChildPort);
+                      }
 
-                            try {
-                              if (!stopping && taskDir.exists()) {
-                                log.info("Removing task directory: %s", taskDir);
-                                FileUtils.deleteDirectory(taskDir);
-                              }
-                            }
-                            catch (Exception e) {
-                              log.makeAlert(e, "Failed to delete task directory")
-                                 .addData("taskDir", taskDir.toString())
-                                 .addData("task", task.getId())
-                                 .emit();
-                            }
-                          }
-                          catch (Exception e) {
-                            log.error(e, "Suppressing exception caught while cleaning up task");
-                          }
+                      try {
+                        if (!stopping && taskDir.exists()) {
+                          log.info("Removing task directory: %s", taskDir);
+                          FileUtils.deleteDirectory(taskDir);
                         }
                       }
+                      catch (Exception e) {
+                        log.makeAlert(e, "Failed to delete task directory")
+                          .addData("taskDir", taskDir.toString())
+                          .addData("task", task.getId())
+                          .emit();
+                      }
+                    }
+                    catch (Exception e) {
+                      log.error(e, "Suppressing exception caught while cleaning up task");
                     }
-                )
+                  }
+                }
+              }
             )
-        );
-      }
+          )
+      );
       saveRunningTasks();
       return tasks.get(task.getId()).getResult();
     }
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
index 83fcc05..a5354b1 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
@@ -166,9 +166,7 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
                       {
                         try {
                           String specId = pair.lhs;
-                          if (!retVal.containsKey(specId)) {
-                            retVal.put(specId, new ArrayList<>());
-                          }
+                          retVal.putIfAbsent(specId, new ArrayList<>());
 
                           retVal.get(specId).add(pair.rhs);
                           return retVal;
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 3787cc6..9255e7c 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -549,9 +549,7 @@ public class DataSourcesResource
         continue;
       }
 
-      if (!tierDistinctSegments.containsKey(tier)) {
-        tierDistinctSegments.put(tier, new HashSet<>());
-      }
+      tierDistinctSegments.computeIfAbsent(tier, k -> new HashSet<>());
 
       long dataSourceSegmentSize = 0;
       for (DataSegment dataSegment : druidDataSource.getSegments()) {
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index de24926..8170014 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2198,9 +2198,7 @@ public class CachingClusteredClientTest
       serverExpectationList.add(serverExpectations);
       for (int j = 0; j < numChunks; ++j) {
         DruidServer lastServer = servers[random.nextInt(servers.length)];
-        if (!serverExpectations.containsKey(lastServer)) {
-          serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
-        }
+        serverExpectations.computeIfAbsent(lastServer, server -> new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
 
         DataSegment mockSegment = makeMock(mocks, DataSegment.class);
         ServerExpectation<Object> expectation = new ServerExpectation<>(
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 9c3e9cc..c7475d9 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -428,9 +428,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
       synchronized (counters) {
         DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
         final long timestampTruncated = dateTimeTruncated.getMillis();
-        if (!counters.containsKey(timestampTruncated)) {
-          counters.put(timestampTruncated, new AtomicInteger());
-        }
+        counters.putIfAbsent(timestampTruncated, new AtomicInteger());
         final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
         return new SegmentIdWithShardSpec(
             dataSource,
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index a9c78c4..80cfd04 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -122,9 +122,7 @@ public class SqlLifecycle
     if (queryContext != null) {
       newContext.putAll(queryContext);
     }
-    if (!newContext.containsKey(PlannerContext.CTX_SQL_QUERY_ID)) {
-      newContext.put(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString());
-    }
+    newContext.computeIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, k -> UUID.randomUUID().toString());
     return newContext;
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index af0e894..4d1206b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -78,9 +78,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
   )
   {
     final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
-    if (!timelines.containsKey(descriptor.getDataSource())) {
-      timelines.put(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural()));
-    }
+    timelines.computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural()));
 
     final VersionedIntervalTimeline<String, Segment> timeline = timelines.get(descriptor.getDataSource());
     timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org