You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/06/14 15:59:43 UTC
[incubator-druid] branch master updated: Use map.putIfAbsent() or
map.computeIfAbsent() as appropriate instead of containsKey() + put()
(#7764)
This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3bee6ad Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764)
3bee6ad is described below
commit 3bee6adcf7338b501687f11ea866608e873c5b0a
Author: Sashidhar Thallam <t....@gmail.com>
AuthorDate: Fri Jun 14 21:29:36 2019 +0530
Use map.putIfAbsent() or map.computeIfAbsent() as appropriate instead of containsKey() + put() (#7764)
* https://github.com/apache/incubator-druid/issues/7316 Use Map.putIfAbsent() instead of containsKey() + put()
* fixing indentation
* Using map.computeIfAbsent() instead of map.putIfAbsent() where appropriate
* fixing checkstyle
* Changing the recommendation text
* Reverting auto changes made by IDE
* Implementing recommendation: A ConcurrentHashMap on which computeIfAbsent() is called should be assigned into variables of ConcurrentHashMap type, not ConcurrentMap
* Removing unused import
---
.idea/inspectionProfiles/Druid.xml | 7 +-
.../druid/storage/s3/S3DataSegmentMoverTest.java | 4 +-
.../indexer/DetermineHashedPartitionsJob.java | 4 +-
.../ActionBasedUsedSegmentChecker.java | 5 +-
.../druid/indexing/common/task/IndexTask.java | 8 +-
.../firehose/IngestSegmentFirehoseFactory.java | 149 +++---
.../druid/indexing/overlord/ForkingTaskRunner.java | 548 ++++++++++-----------
.../metadata/SQLMetadataSupervisorManager.java | 4 +-
.../druid/server/http/DataSourcesResource.java | 4 +-
.../druid/client/CachingClusteredClientTest.java | 4 +-
.../appenderator/StreamAppenderatorDriverTest.java | 4 +-
.../java/org/apache/druid/sql/SqlLifecycle.java | 4 +-
.../util/SpecificSegmentsQuerySegmentWalker.java | 4 +-
13 files changed, 365 insertions(+), 384 deletions(-)
diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index 9770c11..ed890c8 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -306,6 +306,11 @@
<constraint name="x" maxCount="2147483647" within="" contains="" />
<constraint name="ImmutableMap" regexp="Immutable.*" within="" contains="" />
</searchConfiguration>
+ <searchConfiguration name="Use map.putIfAbsent(k,v) or map.computeIfAbsent(k,v) where appropriate instead of containsKey() + put(). If computing v is expensive or has side effects use map.computeIfAbsent() instead" created="1558868694225" text="if (!$m$.containsKey($k$)) { $m$.put($k$, $v$); }" recursive="false" caseInsensitive="true" type="JAVA">
+ <constraint name="m" within="" contains="" />
+ <constraint name="k" within="" contains="" />
+ <constraint name="v" within="" contains="" />
+ </searchConfiguration>
</inspection_tool>
<inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
@@ -400,4 +405,4 @@
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
</inspection_tool>
</profile>
-</component>
\ No newline at end of file
+</component>
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
index ec30aa9..34496d9 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentMoverTest.java
@@ -261,9 +261,7 @@ public class S3DataSegmentMoverTest
@Override
public PutObjectResult putObject(String bucketName, String key, File file)
{
- if (!storage.containsKey(bucketName)) {
- storage.put(bucketName, new HashSet<>());
- }
+ storage.putIfAbsent(bucketName, new HashSet<>());
storage.get(bucketName).add(key);
return new PutObjectResult();
}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
index c83bc08..17f5172 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java
@@ -307,9 +307,7 @@ public class DetermineHashedPartitionsJob implements Jobby
.getSegmentGranularity()
.bucket(DateTimes.utc(inputRow.getTimestampFromEpoch()));
- if (!hyperLogLogs.containsKey(interval)) {
- hyperLogLogs.put(interval, HyperLogLogCollector.makeLatestCollector());
- }
+ hyperLogLogs.computeIfAbsent(interval, intv -> HyperLogLogCollector.makeLatestCollector());
} else {
final Optional<Interval> maybeInterval = config.getGranularitySpec()
.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()));
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
index a1eb90f..96ce6ae 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/appenderator/ActionBasedUsedSegmentChecker.java
@@ -50,9 +50,8 @@ public class ActionBasedUsedSegmentChecker implements UsedSegmentChecker
// Group by dataSource
final Map<String, Set<SegmentIdWithShardSpec>> identifiersByDataSource = new TreeMap<>();
for (SegmentIdWithShardSpec identifier : identifiers) {
- if (!identifiersByDataSource.containsKey(identifier.getDataSource())) {
- identifiersByDataSource.put(identifier.getDataSource(), new HashSet<>());
- }
+ identifiersByDataSource.computeIfAbsent(identifier.getDataSource(), k -> new HashSet<>());
+
identifiersByDataSource.get(identifier.getDataSource()).add(identifier);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index b7e5123..458c621 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -768,9 +768,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
}
if (determineNumPartitions) {
- if (!hllCollectors.containsKey(interval)) {
- hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
- }
+ hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
@@ -781,9 +779,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
} else {
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
- if (!hllCollectors.containsKey(interval)) {
- hllCollectors.put(interval, Optional.absent());
- }
+ hllCollectors.putIfAbsent(interval, Optional.absent());
}
determinePartitionsMeters.incrementProcessed();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 4d1e0bf..2caf91d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -204,87 +204,87 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
segmentIds
);
- try {
- final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
-
- // Download all segments locally.
- // Note: this requires enough local storage space to fit all of the segments, even though
- // IngestSegmentFirehose iterates over the segments in series. We may want to change this
- // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
- final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
- Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
- for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
- for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
- final DataSegment segment = chunk.getObject();
- if (!segmentFileMap.containsKey(segment)) {
- segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
+ final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
+
+ // Download all segments locally.
+ // Note: this requires enough local storage space to fit all of the segments, even though
+ // IngestSegmentFirehose iterates over the segments in series. We may want to change this
+ // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
+ final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+ Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
+ for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
+ for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
+ final DataSegment segment = chunk.getObject();
+
+ segmentFileMap.computeIfAbsent(segment, k -> {
+ try {
+ return segmentLoader.getSegmentFiles(segment);
}
- }
- }
+ catch (SegmentLoadingException e) {
+ throw new RuntimeException(e);
+ }
+ });
- final List<String> dims;
- if (dimensions != null) {
- dims = dimensions;
- } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
- dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
- } else {
- dims = getUniqueDimensions(
- timeLineSegments,
- inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
- );
}
+ }
- final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
+ final List<String> dims;
+ if (dimensions != null) {
+ dims = dimensions;
+ } else if (inputRowParser.getParseSpec().getDimensionsSpec().hasCustomDimensions()) {
+ dims = inputRowParser.getParseSpec().getDimensionsSpec().getDimensionNames();
+ } else {
+ dims = getUniqueDimensions(
+ timeLineSegments,
+ inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions()
+ );
+ }
- final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
- Iterables.concat(
- Iterables.transform(
- timeLineSegments,
- new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>()
- {
+ final List<String> metricsList = metrics == null ? getUniqueMetrics(timeLineSegments) : metrics;
+
+ final List<WindowedStorageAdapter> adapters = Lists.newArrayList(
+ Iterables.concat(
+ Iterables.transform(
+ timeLineSegments,
+ new Function<TimelineObjectHolder<String, DataSegment>, Iterable<WindowedStorageAdapter>>() {
+ @Override
+ public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
+ {
+ return
+ Iterables.transform(
+ holder.getObject(),
+ new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>() {
@Override
- public Iterable<WindowedStorageAdapter> apply(final TimelineObjectHolder<String, DataSegment> holder)
+ public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
{
- return
- Iterables.transform(
- holder.getObject(),
- new Function<PartitionChunk<DataSegment>, WindowedStorageAdapter>()
- {
- @Override
- public WindowedStorageAdapter apply(final PartitionChunk<DataSegment> input)
- {
- final DataSegment segment = input.getObject();
- try {
- return new WindowedStorageAdapter(
- new QueryableIndexStorageAdapter(
- indexIO.loadIndex(
- Preconditions.checkNotNull(
- segmentFileMap.get(segment),
- "File for segment %s", segment.getId()
- )
- )
- ),
- holder.getInterval()
- );
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- );
+ final DataSegment segment = input.getObject();
+ try {
+ return new WindowedStorageAdapter(
+ new QueryableIndexStorageAdapter(
+ indexIO.loadIndex(
+ Preconditions.checkNotNull(
+ segmentFileMap.get(segment),
+ "File for segment %s", segment.getId()
+ )
+ )
+ ),
+ holder.getInterval()
+ );
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
- )
- )
- );
+ );
+ }
+ }
+ )
+ )
+ );
- final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
- return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
- }
- catch (SegmentLoadingException e) {
- throw new RuntimeException(e);
- }
+ final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
+ return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
}
private long jitter(long input)
@@ -508,13 +508,14 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
// segments to olders.
// timelineSegments are sorted in order of interval
- int index = 0;
+ int[] index = {0};
for (TimelineObjectHolder<String, DataSegment> timelineHolder : Lists.reverse(timelineSegments)) {
for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
for (String metric : chunk.getObject().getMetrics()) {
- if (!uniqueMetrics.containsKey(metric)) {
- uniqueMetrics.put(metric, index++);
+ uniqueMetrics.computeIfAbsent(metric, k -> {
+ return index[0]++;
}
+ );
}
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 05f2f52..116747b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -84,7 +84,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -108,7 +107,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
/** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
- private final ConcurrentMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
private volatile boolean stopping = false;
@@ -214,309 +213,306 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
public ListenableFuture<TaskStatus> run(final Task task)
{
synchronized (tasks) {
- if (!tasks.containsKey(task.getId())) {
- tasks.put(
- task.getId(),
- new ForkingTaskRunnerWorkItem(
- task,
- exec.submit(
- new Callable<TaskStatus>()
- {
- @Override
- public TaskStatus call()
- {
- final String attemptUUID = UUID.randomUUID().toString();
- final File taskDir = taskConfig.getTaskDir(task.getId());
- final File attemptDir = new File(taskDir, attemptUUID);
-
- final ProcessHolder processHolder;
- final String childHost = node.getHost();
- int childPort = -1;
- int tlsChildPort = -1;
-
- if (node.isEnablePlaintextPort()) {
- childPort = portFinder.findUnusedPort();
+ tasks.computeIfAbsent(
+ task.getId(), k ->
+ new ForkingTaskRunnerWorkItem(
+ task,
+ exec.submit(
+ new Callable<TaskStatus>() {
+ @Override
+ public TaskStatus call()
+ {
+ final String attemptUUID = UUID.randomUUID().toString();
+ final File taskDir = taskConfig.getTaskDir(task.getId());
+ final File attemptDir = new File(taskDir, attemptUUID);
+
+ final ProcessHolder processHolder;
+ final String childHost = node.getHost();
+ int childPort = -1;
+ int tlsChildPort = -1;
+
+ if (node.isEnablePlaintextPort()) {
+ childPort = portFinder.findUnusedPort();
+ }
+
+ if (node.isEnableTlsPort()) {
+ tlsChildPort = portFinder.findUnusedPort();
+ }
+
+ final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
+
+ try {
+ final Closer closer = Closer.create();
+ try {
+ if (!attemptDir.mkdirs()) {
+ throw new IOE("Could not create directories: %s", attemptDir);
+ }
+
+ final File taskFile = new File(taskDir, "task.json");
+ final File statusFile = new File(attemptDir, "status.json");
+ final File logFile = new File(taskDir, "log");
+ final File reportsFile = new File(attemptDir, "report.json");
+
+ // time to adjust process holders
+ synchronized (tasks) {
+ final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
+
+ if (taskWorkItem.shutdown) {
+ throw new IllegalStateException("Task has been shut down!");
}
- if (node.isEnableTlsPort()) {
- tlsChildPort = portFinder.findUnusedPort();
+ if (taskWorkItem == null) {
+ log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
+ throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
}
- final TaskLocation taskLocation = TaskLocation.create(childHost, childPort, tlsChildPort);
+ if (taskWorkItem.processHolder != null) {
+ log.makeAlert("WTF?! TaskInfo already has a processHolder")
+ .addData("task", task.getId())
+ .emit();
+ throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
+ }
- try {
- final Closer closer = Closer.create();
- try {
- if (!attemptDir.mkdirs()) {
- throw new IOE("Could not create directories: %s", attemptDir);
- }
+ final List<String> command = new ArrayList<>();
+ final String taskClasspath;
+ if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
+ taskClasspath = Joiner.on(File.pathSeparator).join(
+ task.getClasspathPrefix(),
+ config.getClasspath()
+ );
+ } else {
+ taskClasspath = config.getClasspath();
+ }
- final File taskFile = new File(taskDir, "task.json");
- final File statusFile = new File(attemptDir, "status.json");
- final File logFile = new File(taskDir, "log");
- final File reportsFile = new File(attemptDir, "report.json");
-
- // time to adjust process holders
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
-
- if (taskWorkItem.shutdown) {
- throw new IllegalStateException("Task has been shut down!");
- }
-
- if (taskWorkItem == null) {
- log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
- throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
- }
-
- if (taskWorkItem.processHolder != null) {
- log.makeAlert("WTF?! TaskInfo already has a processHolder")
- .addData("task", task.getId())
- .emit();
- throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
- }
-
- final List<String> command = new ArrayList<>();
- final String taskClasspath;
- if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
- taskClasspath = Joiner.on(File.pathSeparator).join(
- task.getClasspathPrefix(),
- config.getClasspath()
- );
- } else {
- taskClasspath = config.getClasspath();
- }
-
- command.add(config.getJavaCommand());
- command.add("-cp");
- command.add(taskClasspath);
-
- Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
- Iterables.addAll(command, config.getJavaOptsArray());
-
- // Override task specific javaOpts
- Object taskJavaOpts = task.getContextValue(
- ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
- );
- if (taskJavaOpts != null) {
- Iterables.addAll(
- command,
- new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
- );
- }
-
- for (String propName : props.stringPropertyNames()) {
- for (String allowedPrefix : config.getAllowedPrefixes()) {
- // See https://github.com/apache/incubator-druid/issues/1841
- if (propName.startsWith(allowedPrefix)
- && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
- && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
- ) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
- propName,
- props.getProperty(propName)
- )
- );
- }
- }
- }
-
- // Override child JVM specific properties
- for (String propName : props.stringPropertyNames()) {
- if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
- propName.substring(CHILD_PROPERTY_PREFIX.length()),
- props.getProperty(propName)
- )
- );
- }
- }
-
- // Override task specific properties
- final Map<String, Object> context = task.getContext();
- if (context != null) {
- for (String propName : context.keySet()) {
- if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
- command.add(
- StringUtils.format(
- "-D%s=%s",
- propName.substring(CHILD_PROPERTY_PREFIX.length()),
- task.getContextValue(propName)
- )
- );
- }
- }
- }
-
- // Add dataSource, taskId and taskType for metrics or logging
- command.add(
- StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.DATASOURCE,
- task.getDataSource()
- )
- );
+ command.add(config.getJavaCommand());
+ command.add("-cp");
+ command.add(taskClasspath);
+
+ Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
+ Iterables.addAll(command, config.getJavaOptsArray());
+
+ // Override task specific javaOpts
+ Object taskJavaOpts = task.getContextValue(
+ ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
+ );
+ if (taskJavaOpts != null) {
+ Iterables.addAll(
+ command,
+ new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
+ );
+ }
+
+ for (String propName : props.stringPropertyNames()) {
+ for (String allowedPrefix : config.getAllowedPrefixes()) {
+ // See https://github.com/apache/incubator-druid/issues/1841
+ if (propName.startsWith(allowedPrefix)
+ && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
+ && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
+ ) {
command.add(
StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.TASK_ID,
- task.getId()
- )
+ "-D%s=%s",
+ propName,
+ props.getProperty(propName)
+ )
);
+ }
+ }
+ }
+
+ // Override child JVM specific properties
+ for (String propName : props.stringPropertyNames()) {
+ if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
+ command.add(
+ StringUtils.format(
+ "-D%s=%s",
+ propName.substring(CHILD_PROPERTY_PREFIX.length()),
+ props.getProperty(propName)
+ )
+ );
+ }
+ }
+
+ // Override task specific properties
+ final Map<String, Object> context = task.getContext();
+ if (context != null) {
+ for (String propName : context.keySet()) {
+ if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
command.add(
StringUtils.format(
- "-D%s%s=%s",
- MonitorsConfig.METRIC_DIMENSION_PREFIX,
- DruidMetrics.TASK_TYPE,
- task.getType()
- )
+ "-D%s=%s",
+ propName.substring(CHILD_PROPERTY_PREFIX.length()),
+ task.getContextValue(propName)
+ )
);
+ }
+ }
+ }
- command.add(StringUtils.format("-Ddruid.host=%s", childHost));
- command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
- command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
- /**
- * These are not enabled per default to allow the user to either set or not set them
- * Users are highly suggested to be set in druid.indexer.runner.javaOpts
- * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
- * for more information
- command.add("-XX:+UseThreadPriorities");
- command.add("-XX:ThreadPriorityPolicy=42");
- */
-
- command.add("org.apache.druid.cli.Main");
- command.add("internal");
- command.add("peon");
- command.add(taskFile.toString());
- command.add(statusFile.toString());
- command.add(reportsFile.toString());
- String nodeType = task.getNodeType();
- if (nodeType != null) {
- command.add("--nodeType");
- command.add(nodeType);
- }
-
- if (!taskFile.exists()) {
- jsonMapper.writeValue(taskFile, task);
- }
-
- log.info("Running command: %s", Joiner.on(" ").join(command));
- taskWorkItem.processHolder = new ProcessHolder(
- new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
- logFile,
- taskLocation.getHost(),
- taskLocation.getPort(),
- taskLocation.getTlsPort()
- );
+ // Add dataSource, taskId and taskType for metrics or logging
+ command.add(
+ StringUtils.format(
+ "-D%s%s=%s",
+ MonitorsConfig.METRIC_DIMENSION_PREFIX,
+ DruidMetrics.DATASOURCE,
+ task.getDataSource()
+ )
+ );
+ command.add(
+ StringUtils.format(
+ "-D%s%s=%s",
+ MonitorsConfig.METRIC_DIMENSION_PREFIX,
+ DruidMetrics.TASK_ID,
+ task.getId()
+ )
+ );
+ command.add(
+ StringUtils.format(
+ "-D%s%s=%s",
+ MonitorsConfig.METRIC_DIMENSION_PREFIX,
+ DruidMetrics.TASK_TYPE,
+ task.getType()
+ )
+ );
+
+ command.add(StringUtils.format("-Ddruid.host=%s", childHost));
+ command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
+ command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
+ /**
+ * These are not enabled per default to allow the user to either set or not set them
+ * Users are highly suggested to be set in druid.indexer.runner.javaOpts
+ * See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
+ * for more information
+ command.add("-XX:+UseThreadPriorities");
+ command.add("-XX:ThreadPriorityPolicy=42");
+ */
+
+ command.add("org.apache.druid.cli.Main");
+ command.add("internal");
+ command.add("peon");
+ command.add(taskFile.toString());
+ command.add(statusFile.toString());
+ command.add(reportsFile.toString());
+ String nodeType = task.getNodeType();
+ if (nodeType != null) {
+ command.add("--nodeType");
+ command.add(nodeType);
+ }
- processHolder = taskWorkItem.processHolder;
- processHolder.registerWithCloser(closer);
- }
+ if (!taskFile.exists()) {
+ jsonMapper.writeValue(taskFile, task);
+ }
- TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
- TaskRunnerUtils.notifyStatusChanged(
- listeners,
- task.getId(),
- TaskStatus.running(task.getId())
- );
+ log.info("Running command: %s", Joiner.on(" ").join(command));
+ taskWorkItem.processHolder = new ProcessHolder(
+ new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
+ logFile,
+ taskLocation.getHost(),
+ taskLocation.getPort(),
+ taskLocation.getTlsPort()
+ );
+
+ processHolder = taskWorkItem.processHolder;
+ processHolder.registerWithCloser(closer);
+ }
- log.info("Logging task %s output to: %s", task.getId(), logFile);
- boolean runFailed = true;
+ TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
+ TaskRunnerUtils.notifyStatusChanged(
+ listeners,
+ task.getId(),
+ TaskStatus.running(task.getId())
+ );
- final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
+ log.info("Logging task %s output to: %s", task.getId(), logFile);
+ boolean runFailed = true;
- // This will block for a while. So we append the thread information with more details
- final String priorThreadName = Thread.currentThread().getName();
- Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
+ final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
- try (final OutputStream toLogfile = logSink.openStream()) {
- ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
- final int statusCode = processHolder.process.waitFor();
- log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
- if (statusCode == 0) {
- runFailed = false;
- }
- }
- finally {
- Thread.currentThread().setName(priorThreadName);
- // Upload task logs
- taskLogPusher.pushTaskLog(task.getId(), logFile);
- if (reportsFile.exists()) {
- taskLogPusher.pushTaskReports(task.getId(), reportsFile);
- }
- }
+ // This will block for a while. So we append the thread information with more details
+ final String priorThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
- TaskStatus status;
- if (!runFailed) {
- // Process exited successfully
- status = jsonMapper.readValue(statusFile, TaskStatus.class);
- } else {
- // Process exited unsuccessfully
- status = TaskStatus.failure(task.getId());
- }
+ try (final OutputStream toLogfile = logSink.openStream()) {
+ ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
+ final int statusCode = processHolder.process.waitFor();
+ log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
+ if (statusCode == 0) {
+ runFailed = false;
+ }
+ }
+ finally {
+ Thread.currentThread().setName(priorThreadName);
+ // Upload task logs
+ taskLogPusher.pushTaskLog(task.getId(), logFile);
+ if (reportsFile.exists()) {
+ taskLogPusher.pushTaskReports(task.getId(), reportsFile);
+ }
+ }
- TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
- return status;
- }
- catch (Throwable t) {
- throw closer.rethrow(t);
- }
- finally {
- closer.close();
- }
+ TaskStatus status;
+ if (!runFailed) {
+ // Process exited successfully
+ status = jsonMapper.readValue(statusFile, TaskStatus.class);
+ } else {
+ // Process exited unsuccessfully
+ status = TaskStatus.failure(task.getId());
+ }
+
+ TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
+ return status;
+ }
+ catch (Throwable t) {
+ throw closer.rethrow(t);
+ }
+ finally {
+ closer.close();
+ }
+ }
+ catch (Throwable t) {
+ log.info(t, "Exception caught during execution");
+ throw new RuntimeException(t);
+ }
+ finally {
+ try {
+ synchronized (tasks) {
+ final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
+ if (taskWorkItem != null && taskWorkItem.processHolder != null) {
+ taskWorkItem.processHolder.process.destroy();
}
- catch (Throwable t) {
- log.info(t, "Exception caught during execution");
- throw new RuntimeException(t);
+ if (!stopping) {
+ saveRunningTasks();
}
- finally {
- try {
- synchronized (tasks) {
- final ForkingTaskRunnerWorkItem taskWorkItem = tasks.remove(task.getId());
- if (taskWorkItem != null && taskWorkItem.processHolder != null) {
- taskWorkItem.processHolder.process.destroy();
- }
- if (!stopping) {
- saveRunningTasks();
- }
- }
+ }
- if (node.isEnablePlaintextPort()) {
- portFinder.markPortUnused(childPort);
- }
- if (node.isEnableTlsPort()) {
- portFinder.markPortUnused(tlsChildPort);
- }
+ if (node.isEnablePlaintextPort()) {
+ portFinder.markPortUnused(childPort);
+ }
+ if (node.isEnableTlsPort()) {
+ portFinder.markPortUnused(tlsChildPort);
+ }
- try {
- if (!stopping && taskDir.exists()) {
- log.info("Removing task directory: %s", taskDir);
- FileUtils.deleteDirectory(taskDir);
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to delete task directory")
- .addData("taskDir", taskDir.toString())
- .addData("task", task.getId())
- .emit();
- }
- }
- catch (Exception e) {
- log.error(e, "Suppressing exception caught while cleaning up task");
- }
+ try {
+ if (!stopping && taskDir.exists()) {
+ log.info("Removing task directory: %s", taskDir);
+ FileUtils.deleteDirectory(taskDir);
}
}
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to delete task directory")
+ .addData("taskDir", taskDir.toString())
+ .addData("task", task.getId())
+ .emit();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Suppressing exception caught while cleaning up task");
}
- )
+ }
+ }
+ }
)
- );
- }
+ )
+ );
saveRunningTasks();
return tasks.get(task.getId()).getResult();
}
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
index 83fcc05..a5354b1 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSupervisorManager.java
@@ -166,9 +166,7 @@ public class SQLMetadataSupervisorManager implements MetadataSupervisorManager
{
try {
String specId = pair.lhs;
- if (!retVal.containsKey(specId)) {
- retVal.put(specId, new ArrayList<>());
- }
+ retVal.putIfAbsent(specId, new ArrayList<>());
retVal.get(specId).add(pair.rhs);
return retVal;
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 3787cc6..9255e7c 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -549,9 +549,7 @@ public class DataSourcesResource
continue;
}
- if (!tierDistinctSegments.containsKey(tier)) {
- tierDistinctSegments.put(tier, new HashSet<>());
- }
+ tierDistinctSegments.computeIfAbsent(tier, k -> new HashSet<>());
long dataSourceSegmentSize = 0;
for (DataSegment dataSegment : druidDataSource.getSegments()) {
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index de24926..8170014 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2198,9 +2198,7 @@ public class CachingClusteredClientTest
serverExpectationList.add(serverExpectations);
for (int j = 0; j < numChunks; ++j) {
DruidServer lastServer = servers[random.nextInt(servers.length)];
- if (!serverExpectations.containsKey(lastServer)) {
- serverExpectations.put(lastServer, new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
- }
+ serverExpectations.computeIfAbsent(lastServer, server -> new ServerExpectations(lastServer, makeMock(mocks, QueryRunner.class)));
DataSegment mockSegment = makeMock(mocks, DataSegment.class);
ServerExpectation<Object> expectation = new ServerExpectation<>(
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 9c3e9cc..c7475d9 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -428,9 +428,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport
synchronized (counters) {
DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp());
final long timestampTruncated = dateTimeTruncated.getMillis();
- if (!counters.containsKey(timestampTruncated)) {
- counters.put(timestampTruncated, new AtomicInteger());
- }
+ counters.putIfAbsent(timestampTruncated, new AtomicInteger());
final int partitionNum = counters.get(timestampTruncated).getAndIncrement();
return new SegmentIdWithShardSpec(
dataSource,
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index a9c78c4..80cfd04 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -122,9 +122,7 @@ public class SqlLifecycle
if (queryContext != null) {
newContext.putAll(queryContext);
}
- if (!newContext.containsKey(PlannerContext.CTX_SQL_QUERY_ID)) {
- newContext.put(PlannerContext.CTX_SQL_QUERY_ID, UUID.randomUUID().toString());
- }
+ newContext.computeIfAbsent(PlannerContext.CTX_SQL_QUERY_ID, k -> UUID.randomUUID().toString());
return newContext;
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index af0e894..4d1206b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -78,9 +78,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
)
{
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
- if (!timelines.containsKey(descriptor.getDataSource())) {
- timelines.put(descriptor.getDataSource(), new VersionedIntervalTimeline<>(Ordering.natural()));
- }
+ timelines.computeIfAbsent(descriptor.getDataSource(), datasource -> new VersionedIntervalTimeline<>(Ordering.natural()));
final VersionedIntervalTimeline<String, Segment> timeline = timelines.get(descriptor.getDataSource());
timeline.add(descriptor.getInterval(), descriptor.getVersion(), descriptor.getShardSpec().createChunk(segment));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org