You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/07/18 06:02:28 UTC

[incubator-druid] branch master updated: Refactoring to use `CollectionUtils.mapValues` (#8059)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new da16144  Refactoring to use `CollectionUtils.mapValues` (#8059)
da16144 is described below

commit da16144495ed04f26c55935489257fee0a3e8ff2
Author: Surekha <su...@imply.io>
AuthorDate: Wed Jul 17 23:02:22 2019 -0700

    Refactoring to use `CollectionUtils.mapValues` (#8059)
    
    * doc updates and changes to use the CollectionUtils.mapValues utility method
    
    * Add Structural Search patterns to intelliJ
    
    * refactoring from PR comments
    
    * put -> putIfAbsent
    
    * do single key lookup
---
 .idea/inspectionProfiles/Druid.xml                 | 22 +++++++++++++++++++++
 .../org/apache/druid/utils/CollectionUtils.java    | 23 ++++++++++++++++++++++
 .../IncrementalPublishingKafkaIndexTaskRunner.java | 12 +++++------
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  2 ++
 .../apache/druid/indexing/overlord/TaskQueue.java  | 15 +++-----------
 .../SeekableStreamIndexTaskRunner.java             |  7 +++++--
 .../aggregation/post/ExpressionPostAggregator.java |  9 ++-------
 .../appenderator/BaseAppenderatorDriver.java       | 10 ++--------
 .../org/apache/druid/server/SegmentManager.java    |  9 +++------
 .../druid/server/coordinator/DruidCluster.java     | 18 +++++++----------
 .../server/coordinator/cost/ClusterCostCache.java  |  7 ++-----
 .../server/coordinator/cost/ServerCostCache.java   |  7 ++-----
 12 files changed, 78 insertions(+), 63 deletions(-)

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index bba3782..bac71e7 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -311,6 +311,28 @@
         <constraint name="k" within="" contains="" />
         <constraint name="v" within="" contains="" />
       </searchConfiguration>
+      <searchConfiguration name="Use CollectionUtils.mapValues(Map&lt;K,V&gt;, Function&lt;V,V2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap($k$ -&gt; $k$.getKey(), $y$))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="x" within="" contains="" />
+        <constraint name="y" within="" contains="" />
+        <constraint name="k" within="" contains="" />
+        <constraint name="__context__" target="true" within="" contains="" />
+      </searchConfiguration>
+      <searchConfiguration name="Use CollectionUtils.mapValues(Map&lt;k,v&gt;, Function&lt;v,v2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap(Entry::getKey, $y$))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="x" within="" contains="" />
+        <constraint name="y" within="" contains="" />
+        <constraint name="__context__" target="true" within="" contains="" />
+      </searchConfiguration>
+      <searchConfiguration name="Use CollectionUtils.mapKeys(Map&lt;K,V&gt;, Function&lt;K,K2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap($y$, $v$ -&gt; $v$.getValue()))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="x" within="" contains="" />
+        <constraint name="y" within="" contains="" />
+        <constraint name="__context__" target="true" within="" contains="" />
+        <constraint name="v" within="" contains="" />
+      </searchConfiguration>
+      <searchConfiguration name="Use CollectionUtils.mapKeys(Map&lt;k,v&gt;, Function&lt;k,k2&gt;)" text="$x$.entrySet().stream().collect(Collectors.toMap($y$, Map.Entry::getValue))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="x" within="" contains="" />
+        <constraint name="y" within="" contains="" />
+        <constraint name="__context__" target="true" within="" contains="" />
+      </searchConfiguration>
       <searchConfiguration name="Use collections constructors directly" text="Stream.of($x$).collect(Collectors.$m$())" recursive="true" caseInsensitive="true" type="JAVA">
         <constraint name="__context__" target="true" within="" contains="" />
         <constraint name="m" within="" contains="" />
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
index deb4eaf..530feb8 100644
--- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -21,10 +21,12 @@ package org.apache.druid.utils;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import org.apache.druid.java.util.common.ISE;
 
 import java.util.AbstractCollection;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Spliterator;
@@ -85,6 +87,8 @@ public final class CollectionUtils
   /**
    * Returns a transformed map from the given input map where the value is modified based on the given valueMapper
    * function.
+   * Unlike {@link Maps#transformValues}, this method applies the mapping function eagerly to all key-value pairs
+   * in the source map and returns a new {@link HashMap}, while {@link Maps#transformValues} returns a lazy map view.
    */
   public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
   {
@@ -93,6 +97,25 @@ public final class CollectionUtils
     return result;
   }
 
+  /**
+   * Returns a transformed map from the given input map where the key is modified based on the given keyMapper
+   * function. This method fails if keys collide after applying the  given keyMapper function and
+   * throws a IllegalStateException.
+   *
+   * @throws ISE if key collisions occur while applying specified keyMapper
+   */
+  public static <K, V, K2> Map<K2, V> mapKeys(Map<K, V> map, Function<K, K2> keyMapper)
+  {
+    final Map<K2, V> result = Maps.newHashMapWithExpectedSize(map.size());
+    map.forEach((k, v) -> {
+      final K2 k2 = keyMapper.apply(k);
+      if (result.putIfAbsent(k2, v) != null) {
+        throw new ISE("Conflicting key[%s] calculated via keyMapper for original key[%s]", k2, k);
+      }
+    });
+    return result;
+  }
+
   private CollectionUtils()
   {
   }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index bf0580cd..3b34454 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -39,6 +39,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CircularBuffer;
+import org.apache.druid.utils.CollectionUtils;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import org.apache.kafka.common.TopicPartition;
 
@@ -54,7 +55,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * Kafka indexing task runner supporting incremental segments publishing
@@ -163,12 +163,10 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
     }
 
     if (doReset) {
-      sendResetRequestAndWait(resetPartitions.entrySet()
-                                             .stream()
-                                             .collect(Collectors.toMap(x -> StreamPartition.of(
-                                                 x.getKey().topic(),
-                                                 x.getKey().partition()
-                                             ), Map.Entry::getValue)), taskToolbox);
+      sendResetRequestAndWait(CollectionUtils.mapKeys(resetPartitions, streamPartition -> StreamPartition.of(
+          streamPartition.topic(),
+          streamPartition.partition()
+      )), taskToolbox);
     } else {
       log.warn("Retrying in %dms", task.getPollRetryMs());
       pollRetryLock.lockInterruptibly();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index c769617..e9c7d2a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -266,6 +266,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
 
 
   @Override
+  // suppress use of CollectionUtils.mapValues() since the valueMapper function is dependent on map key here
+  @SuppressWarnings("SSBasedInspection")
   protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets)
   {
     return currentOffsets
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index b820b23..ee3a56d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -45,6 +45,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.utils.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -621,12 +622,7 @@ public class TaskQueue
 
   public Map<String, Long> getSuccessfulTaskCount()
   {
-    Map<String, Long> total = totalSuccessfulTaskCount.entrySet()
-                                                      .stream()
-                                                      .collect(Collectors.toMap(
-                                                          Map.Entry::getKey,
-                                                          e -> e.getValue().get()
-                                                      ));
+    Map<String, Long> total = CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
     Map<String, Long> delta = getDeltaValues(total, prevTotalSuccessfulTaskCount);
     prevTotalSuccessfulTaskCount = total;
     return delta;
@@ -634,12 +630,7 @@ public class TaskQueue
 
   public Map<String, Long> getFailedTaskCount()
   {
-    Map<String, Long> total = totalFailedTaskCount.entrySet()
-                                                  .stream()
-                                                  .collect(Collectors.toMap(
-                                                      Map.Entry::getKey,
-                                                      e -> e.getValue().get()
-                                                  ));
+    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, AtomicLong::get);
     Map<String, Long> delta = getDeltaValues(total, prevTotalFailedTaskCount);
     prevTotalFailedTaskCount = total;
     return delta;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index f7de882..a9e8941 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -80,6 +80,7 @@ import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.utils.CircularBuffer;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -1274,8 +1275,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   )
       throws IOException
   {
-    Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = outOfRangePartitions
-        .entrySet().stream().collect(Collectors.toMap(x -> x.getKey().getPartitionId(), Map.Entry::getValue));
+    Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap = CollectionUtils.mapKeys(
+        outOfRangePartitions,
+        StreamPartition::getPartitionId
+    );
 
     boolean result = taskToolbox
         .getTaskActionClient()
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
index 281356c..cfd6578 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
@@ -35,13 +35,13 @@ import org.apache.druid.math.expr.Parser;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
 import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.utils.CollectionUtils;
 
 import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class ExpressionPostAggregator implements PostAggregator
 {
@@ -187,12 +187,7 @@ public class ExpressionPostAggregator implements PostAggregator
         expression,
         ordering,
         macroTable,
-        aggregators.entrySet().stream().collect(
-            Collectors.toMap(
-                entry -> entry.getKey(),
-                entry -> entry.getValue()::finalizeComputation
-            )
-        ),
+        CollectionUtils.mapValues(aggregators, aggregatorFactory -> obj -> aggregatorFactory.finalizeComputation(obj)),
         parsed,
         dependentFields
     );
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index bfabeed..9ef2420 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -689,14 +690,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
                 )
             )
         ),
-        snapshot.entrySet()
-                .stream()
-                .collect(
-                    Collectors.toMap(
-                        Entry::getKey,
-                        e -> e.getValue().lastSegmentId
-                    )
-                ),
+        CollectionUtils.mapValues(snapshot, segmentsForSequence -> segmentsForSequence.lastSegmentId),
         committer.getMetadata()
     );
 
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index ecbb0d4..529a37f 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -32,12 +32,11 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.PartitionHolder;
+import org.apache.druid.utils.CollectionUtils;
 
 import javax.annotation.Nullable;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 
 /**
  * This class is responsible for managing data sources and their states like timeline, total segment size, and number of
@@ -115,8 +114,7 @@ public class SegmentManager
    */
   public Map<String, Long> getDataSourceSizes()
   {
-    return dataSources.entrySet().stream()
-                      .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getTotalSegmentSize()));
+    return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getTotalSegmentSize);
   }
 
   /**
@@ -127,8 +125,7 @@ public class SegmentManager
    */
   public Map<String, Long> getDataSourceCounts()
   {
-    return dataSources.entrySet().stream()
-                      .collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getNumSegments()));
+    return CollectionUtils.mapValues(dataSources, SegmentManager.DataSourceState::getNumSegments);
   }
 
   public boolean isSegmentCached(final DataSegment segment)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
index 2001b3f..6f52bf8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCluster.java
@@ -36,7 +36,6 @@ import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.stream.Collectors;
 
 /**
  * Contains a representation of the current state of the cluster by tier.
@@ -69,16 +68,13 @@ public class DruidCluster
   )
   {
     this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
-    this.historicals = historicals
-        .entrySet()
-        .stream()
-        .collect(
-            Collectors.toMap(
-                Map.Entry::getKey,
-                (Map.Entry<String, Iterable<ServerHolder>> e) ->
-                    CollectionUtils.newTreeSet(Comparator.reverseOrder(), e.getValue())
-            )
-        );
+    this.historicals = CollectionUtils.mapValues(
+        historicals,
+        holders -> CollectionUtils.newTreeSet(
+            Comparator.reverseOrder(),
+            holders
+        )
+    );
   }
 
   public void add(ServerHolder serverHolder)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java
index 63bf3c8..84bd317 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ClusterCostCache.java
@@ -21,11 +21,11 @@ package org.apache.druid.server.coordinator.cost;
 
 import com.google.common.base.Preconditions;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class ClusterCostCache
 {
@@ -82,10 +82,7 @@ public class ClusterCostCache
     public ClusterCostCache build()
     {
       return new ClusterCostCache(
-          serversCostCache
-              .entrySet()
-              .stream()
-              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
+          CollectionUtils.mapValues(serversCostCache, ServerCostCache.Builder::build)
       );
     }
   }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java
index 1310b47..7897569 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/cost/ServerCostCache.java
@@ -21,10 +21,10 @@ package org.apache.druid.server.coordinator.cost;
 
 import com.google.common.base.Preconditions;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.utils.CollectionUtils;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class ServerCostCache
 {
@@ -89,10 +89,7 @@ public class ServerCostCache
     {
       return new ServerCostCache(
           allSegmentsCostCache.build(),
-          segmentsPerDataSource
-              .entrySet()
-              .stream()
-              .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()))
+          CollectionUtils.mapValues(segmentsPerDataSource, SegmentsCostCache.Builder::build)
       );
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org