You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/07/08 18:01:04 UTC

[storm] branch master updated: STORM-3422: Make the TupleCaptureBolt thread-safe

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 60a93ad  STORM-3422: Make the TupleCaptureBolt thread-safe
     new 727caf6  Merge pull request #3034 from JanecekPetr/3422-TupleCaptureBolt_is_not_thread-safe
60a93ad is described below

commit 60a93add93bc4e1d63ad6d3356a81626ace00920
Author: Petr Janeček <pe...@anritsu.com>
AuthorDate: Mon Jul 8 19:06:32 2019 +0200

    STORM-3422: Make the TupleCaptureBolt thread-safe
---
 .../org/apache/storm/testing/TupleCaptureBolt.java | 33 ++++++++++++++--------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
index 19bdf86..6d07372 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
@@ -17,22 +17,29 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 
-
 public class TupleCaptureBolt implements IRichBolt {
-    public static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<>();
 
-    private String name;
+    /*
+     * Even though normally bolts do not need to care about thread safety, this particular bolt is different.
+     * It maintains a static field that is prepopulated before the topology starts, is written into by the topology,
+     * and is then read from after the topology is completed - all of this by potentially different threads.
+     */
+
+    private static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new ConcurrentHashMap<>();
+
+    private final String name;
     private OutputCollector collector;
 
     public TupleCaptureBolt() {
         name = UUID.randomUUID().toString();
-        emitted_tuples.put(name, new HashMap<String, List<FixedTuple>>());
+        emitted_tuples.put(name, new ConcurrentHashMap<String, List<FixedTuple>>());
     }
 
     @Override
@@ -43,11 +50,14 @@ public class TupleCaptureBolt implements IRichBolt {
     @Override
     public void execute(Tuple input) {
         String component = input.getSourceComponent();
-        Map<String, List<FixedTuple>> captured = emitted_tuples.get(name);
-        if (!captured.containsKey(component)) {
-            captured.put(component, new ArrayList<FixedTuple>());
-        }
-        captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
+        emitted_tuples.get(name)
+            .compute(component, (String key, List<FixedTuple> tuples) -> {
+                if (tuples == null) {
+                    tuples = new ArrayList<>();
+                }
+                tuples.add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
+                return tuples;
+            });
         collector.ack(input);
     }
 
@@ -64,8 +74,9 @@ public class TupleCaptureBolt implements IRichBolt {
     }
 
     public Map<String, List<FixedTuple>> getAndClearResults() {
-        Map<String, List<FixedTuple>> ret = new HashMap<>(emitted_tuples.get(name));
-        emitted_tuples.get(name).clear();
+        Map<String, List<FixedTuple>> results = emitted_tuples.get(name);
+        Map<String, List<FixedTuple>> ret = new HashMap<>(results);
+        results.clear();
         return ret;
     }