You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/18 06:02:28 UTC
[incubator-druid] branch master updated: Refactoring to use
`CollectionUtils.mapValues` (#8059)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new da16144 Refactoring to use `CollectionUtils.mapValues` (#8059)
da16144 is described below
commit da16144495ed04f26c55935489257fee0a3e8ff2
Author: Surekha <su...@imply.io>
AuthorDate: Wed Jul 17 23:02:22 2019 -0700
Refactoring to use `CollectionUtils.mapValues` (#8059)
* doc updates and changes to use the CollectionUtils.mapValues utility method
* Add Structural Search patterns to intelliJ
* refactoring from PR comments
* put -> putIfAbsent
* do single key lookup
---
.idea/inspectionProfiles/Druid.xml | 22 +++++++++++++++++++++
.../org/apache/druid/utils/CollectionUtils.java | 23 ++++++++++++++++++++++
.../IncrementalPublishingKafkaIndexTaskRunner.java | 12 +++++------
.../indexing/kafka/supervisor/KafkaSupervisor.java | 2 ++
.../apache/druid/indexing/overlord/TaskQueue.java | 15 +++-----------
.../SeekableStreamIndexTaskRunner.java | 7 +++++--
.../aggregation/post/ExpressionPostAggregator.java | 9 ++-------
.../appenderator/BaseAppenderatorDriver.java | 10 ++--------
.../org/apache/druid/server/SegmentManager.java | 9 +++------
.../druid/server/coordinator/DruidCluster.java | 18 +++++++----------
.../server/coordinator/cost/ClusterCostCache.java | 7 ++-----
.../server/coordinator/cost/ServerCostCache.java | 7 ++-----
12 files changed, 78 insertions(+), 63 deletions(-)
diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index bba3782..bac71e7 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -311,6 +311,28 @@
<constraint name="k" within="" contains="" />
<constraint name="v" within="" contains="" />
</searchConfiguration>
+ <searchConfiguration name="Use CollectionUtils.mapValues(Map<K,V>, Function<V,V2>)" text="$x$.entrySet().stream().collect(Collectors.toMap($k$ -> $k$.getKey(), $y$))" recursive="true" caseInsensitive="true" type="JAVA">
+ <constraint name="x" within="" contains="" />
+ <constraint name="y" within="" contains="" />
+ <constraint name="k" within="" contains="" />
+ <constraint name="__context__" target="true" within="" contains="" />
+ </searchConfiguration>
+ <searchConfiguration name="Use CollectionUtils.mapValues(Map<k,v>, Function<v,v2>)" text="$x$.entrySet().stream().collect(Collectors.toMap(Entry::getKey, $y$))" recursive="true" caseInsensitive="true" type="JAVA">
+ <constraint name="x" within="" contains="" />
+ <constraint name="y" within="" contains="" />
+ <constraint name="__context__" target="true" within="" contains="" />
+ </searchConfiguration>
+ <searchConfiguration name="Use CollectionUtils.mapKeys(Map<K,V>, Function<K,K2>)" text="$x$.entrySet().stream().collect(Collectors.toMap($y$, $v$ -> $v$.getValue()))" recursive="true" caseInsensitive="true" type="JAVA">
+ <constraint name="x" within="" contains="" />
+ <constraint name="y" within="" contains="" />
+ <constraint name="__context__" target="true" within="" contains="" />
+ <constraint name="v" within="" contains="" />
+ </searchConfiguration>
+ <searchConfiguration name="Use CollectionUtils.mapKeys(Map<k,v>, Function<k,k2>)" text="$x$.entrySet().stream().collect(Collectors.toMap($y$, Map.Entry::getValue))" recursive="true" caseInsensitive="true" type="JAVA">
+ <constraint name="x" within="" contains="" />
+ <constraint name="y" within="" contains="" />
+ <constraint name="__context__" target="true" within="" contains="" />
+ </searchConfiguration>
<searchConfiguration name="Use collections constructors directly" text="Stream.of($x$).collect(Collectors.$m$())" recursive="true" caseInsensitive="true" type="JAVA">
<constraint name="__context__" target="true" within="" contains="" />
<constraint name="m" within="" contains="" />
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
index deb4eaf..530feb8 100644
--- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -21,10 +21,12 @@ package org.apache.druid.utils;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.druid.java.util.common.ISE;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterator;
@@ -85,6 +87,8 @@ public final class CollectionUtils
/**
* Returns a transformed map from the given input map where the value is modified based on the given valueMapper
* function.
+ * Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs
+ * in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view.
*/
public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
{
@@ -93,6 +97,25 @@ public final class CollectionUtils
return result;
}
+ /**
+ * Returns a transformed map from the given input map where the key is modified based on the given keyMapper
+ * function. This method fails if keys collide after applying the given keyMapper function and
+ * throws a IllegalStateException.
+ *
+ * @throws ISE if key collisions occur while applying specified keyMapper
+ */
+ public static <K, V, K2> Map<K2, V> mapKeys(Map<K, V> map, Function<K, K2> keyMapper)
+ {
+ final Map<K2, V> result = Maps.newHashMapWithExpectedSize(map.size());
+ map.forEach((k, v) -> {
+ final K2 k2 = keyMapper.apply(k);
+ if (result.putIfAbsent(k2, v) != null) {
+ throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k);
+ }
+ });
+ return result;
+ }
+
private CollectionUtils()
{
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index bf0580cd..3b34454 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CircularBuffer;
+import org.apache.druid.utils.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
@@ -54,7 +55,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* Kafka indexing task runner supporting incremental segments publishing
@@ -163,12 +163,10 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
}
if (doReset) {
- sendResetRequestAndWait(resetPartitions.entrySet()
- .stream()
- .collect(Collectors.toMap(x -> StreamPartition.of(
- x.getKey().topic(),
- x.getKey().partition()
- ), Map.Entry::getValue)), taskToolbox);
+ sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of(
+ streamPartition.topic(),
+ streamPartition.partition()
+ )), taskToolbox);
} else {
log.warn("Retrying in %dms", task.getPollRetryMs());
pollRetryLock.lockInterruptibly();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index c769617..e9c7d2a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -266,6 +266,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
@Override
+ // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
+ @SuppressWarnings("SSBasedInspection")
protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
{
return currentOffsets
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index b820b23..ee3a56d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -45,6 +45,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.Collection;
@@ -621,12 +622,7 @@ public class TaskQueue
public Map<String, Long> getSuccessfulTaskCount()
{
- Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
- .stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> e.getValue().get()
- ));
+ Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
prevTotalSuccessfulTaskCount = total;
return delta;
@@ -634,12 +630,7 @@ public class TaskQueue
public Map<String, Long> getFailedTaskCount()
{
- Map<String, Long> total = totalFailedTaskCount.entrySet()
- .stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> e.getValue().get()
- ));
+ Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
prevTotalFailedTaskCount = total;
return delta;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f7de882..a9e8941 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -80,6 +80,7 @@ import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -1274,8 +1275,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
)
throws IOException
{
- Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = outOfRangePartitions
- .entrySet().stream().collect(Collectors.toMap(x -> x.getKey().getPartitionId(), Map.Entry::getValue));
+ Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = CollectionUtils.mapKeys(
+ outOfRangePartitions,
+ StreamPartition::getPartitionId
+ );
boolean result = taskToolbox
.getTaskActionClient()
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
index 281356c..cfd6578 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
@@ -35,13 +35,13 @@ import org.apache.druid.math.expr.Parser;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
public class ExpressionPostAggregator implements PostAggregator
{
@@ -187,12 +187,7 @@ public class ExpressionPostAggregator implements PostAggregator
expression,
ordering,
macroTable,
- aggregators.entrySet().stream().collect(
- Collectors.toMap(
- entry -> entry.getKey(),
- entry -> entry.getValue()::finalizeComputation
- )
- ),
+ CollectionUtils.mapValues(aggregators, aggregatorFactory -> obj -> aggregatorFactory.finalizeComputation(obj)),
parsed,
dependentFields
);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index bfabeed..9ef2420 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -689,14 +690,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
)
)
),
- snapshot.entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- Entry::getKey,
- e -> e.getValue().lastSegmentId
- )
- ),
+ CollectionUtils.mapValues(snapshot, segmentsForSequence -> segmentsForSequence.lastSegmentId),
committer.getMetadata()
);
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index ecbb0d4..529a37f 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -32,12 +32,11 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
+import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
/**
* This class is responsible for managing data sources and their states like timeline, total segment size, and number of
@@ -115,8 +114,7 @@ public class SegmentManager
*/
public Map<String, Long> getDataSourceSizes()
{
- return dataSources.entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize()));
+ return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
}
/**
@@ -127,8 +125,7 @@ public class SegmentManager
*/
public Map<String, Long> getDataSourceCounts()
{
- return dataSources.entrySet().stream()
- .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments()));
+ return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
}
public boolean isSegmentCached(final DataSegment segment)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 2001b3f..6f52bf8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -36,7 +36,6 @@ import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
-import java.util.stream.Collectors;
/**
* Contains a representation of the current state of the cluster by tier.
@@ -69,16 +68,13 @@ public class DruidCluster
)
{
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
- this.historicals = historicals
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- (Map.Entry<String, Iterable<ServerHolder>> e) ->
- CollectionUtils.newTreeSet(Comparator.reverseOrder(), e.getValue())
- )
- );
+ this.historicals = CollectionUtils.mapValues(
+ historicals,
+ holders -> CollectionUtils.newTreeSet(
+ Comparator.reverseOrder(),
+ holders
+ )
+ );
}
public void add(ServerHolder serverHolder)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java
index 63bf3c8..84bd317 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java
@@ -21,11 +21,11 @@ package org.apache.druid.server.coordinator.cost;
import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
public class ClusterCostCache
{
@@ -82,10 +82,7 @@ public class ClusterCostCache
public ClusterCostCache build()
{
return new ClusterCostCache(
- serversCostCache
- .entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
+ CollectionUtils.mapValues(serversCostCache, ServerCostCache.Builder::build)
);
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java
index 1310b47..7897569 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java
@@ -21,10 +21,10 @@ package org.apache.druid.server.coordinator.cost;
import com.google.common.base.Preconditions;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
import java.util.HashMap;
import java.util.Map;
-import java.util.stream.Collectors;
public class ServerCostCache
{
@@ -89,10 +89,7 @@ public class ServerCostCache
{
return new ServerCostCache(
allSegmentsCostCache.build(),
- segmentsPerDataSource
- .entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
+ CollectionUtils.mapValues(segmentsPerDataSource, SegmentsCostCache.Builder::build)
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org