You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/07/08 18:01:04 UTC
[storm] branch master updated: STORM-3422: Make the
TupleCaptureBolt thread-safe
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 60a93ad STORM-3422: Make the TupleCaptureBolt thread-safe
new 727caf6 Merge pull request #3034 from JanecekPetr/3422-TupleCaptureBolt_is_not_thread-safe
60a93ad is described below
commit 60a93add93bc4e1d63ad6d3356a81626ace00920
Author: Petr Janeček <pe...@anritsu.com>
AuthorDate: Mon Jul 8 19:06:32 2019 +0200
STORM-3422: Make the TupleCaptureBolt thread-safe
---
.../org/apache/storm/testing/TupleCaptureBolt.java | 33 ++++++++++++++--------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
index 19bdf86..6d07372 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java
@@ -17,22 +17,29 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
-
public class TupleCaptureBolt implements IRichBolt {
- public static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<>();
- private String name;
+ /*
+ * Even though normally bolts do not need to care about thread safety, this particular bolt is different.
+ * It maintains a static field that is prepopulated before the topology starts, is written into by the topology,
+ * and is then read from after the topology is completed - all of this by potentially different threads.
+ */
+
+ private static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new ConcurrentHashMap<>();
+
+ private final String name;
private OutputCollector collector;
public TupleCaptureBolt() {
name = UUID.randomUUID().toString();
- emitted_tuples.put(name, new HashMap<String, List<FixedTuple>>());
+ emitted_tuples.put(name, new ConcurrentHashMap<String, List<FixedTuple>>());
}
@Override
@@ -43,11 +50,14 @@ public class TupleCaptureBolt implements IRichBolt {
@Override
public void execute(Tuple input) {
String component = input.getSourceComponent();
- Map<String, List<FixedTuple>> captured = emitted_tuples.get(name);
- if (!captured.containsKey(component)) {
- captured.put(component, new ArrayList<FixedTuple>());
- }
- captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
+ emitted_tuples.get(name)
+ .compute(component, (String key, List<FixedTuple> tuples) -> {
+ if (tuples == null) {
+ tuples = new ArrayList<>();
+ }
+ tuples.add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
+ return tuples;
+ });
collector.ack(input);
}
@@ -64,8 +74,9 @@ public class TupleCaptureBolt implements IRichBolt {
}
public Map<String, List<FixedTuple>> getAndClearResults() {
- Map<String, List<FixedTuple>> ret = new HashMap<>(emitted_tuples.get(name));
- emitted_tuples.get(name).clear();
+ Map<String, List<FixedTuple>> results = emitted_tuples.get(name);
+ Map<String, List<FixedTuple>> ret = new HashMap<>(results);
+ results.clear();
return ret;
}