You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 20:10:48 UTC
[1/2] storm git commit: Merge branch 'STORM-737' of
github.com:HeartSaVioR/storm
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch ed2e89117 -> 7e5e5f0e3
Merge branch 'STORM-737' of github.com:HeartSaVioR/storm
Conflicts:
storm-core/src/clj/backtype/storm/daemon/worker.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/acc1a79d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/acc1a79d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/acc1a79d
Branch: refs/heads/0.10.x-branch
Commit: acc1a79de17802c59810193311aa182b963e8511
Parents: ed2e891
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 13:53:39 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 13:58:23 2015 -0400
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/worker.clj | 13 ++--
.../backtype/storm/utils/TransferDrainer.java | 62 +++++++++++++-------
2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/acc1a79d/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index a83146b..bd5b9c1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -129,10 +129,10 @@
(.add local pair)
;;Using java objects directly to avoid performance issues in java code
- (let [node+port (get @task->node+port task)]
- (when (not (.get remoteMap node+port))
- (.put remoteMap node+port (ArrayList.)))
- (let [remote (.get remoteMap node+port)]
+ (do
+ (when (not (.get remoteMap task))
+ (.put remoteMap task (ArrayList.)))
+ (let [remote (.get remoteMap task)]
(if (not-nil? task)
(.add remote (TaskMessage. task (.serialize serializer tuple)))
(log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
@@ -342,8 +342,9 @@
(when batch-end?
(read-locked endpoint-socket-lock
- (let [node+port->socket @node+port->socket]
- (.send drainer node+port->socket)))
+ (let [node+port->socket @node+port->socket
+ task->node+port @task->node+port]
+ (.send drainer task->node+port node+port->socket)))
(.clear drainer))))))
;; Check whether this messaging connection is ready to send data
http://git-wip-us.apache.org/repos/asf/storm/blob/acc1a79d/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 0e53632..5a111ce 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -23,40 +23,60 @@ import java.util.Iterator;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
+import com.google.common.collect.Maps;
public class TransferDrainer {
- private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
+ private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
- public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
- for (String key : workerTupleSetMap.keySet()) {
-
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
- if (null == bundle) {
- bundle = new ArrayList<ArrayList<TaskMessage>>();
- bundles.put(key, bundle);
- }
-
- ArrayList tupleSet = workerTupleSetMap.get(key);
- if (null != tupleSet && tupleSet.size() > 0) {
- bundle.add(tupleSet);
- }
- }
+ public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
+ for (Integer task : taskTupleSetMap.keySet()) {
+ addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
+ }
}
- public void send(HashMap<String, IConnection> connections) {
- for (String hostPort : bundles.keySet()) {
+ public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
+ HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+ for (String hostPort : bundleMapByDestination.keySet()) {
IConnection connection = connections.get(hostPort);
- if (null != connection) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+ if (null != connection) {
+ ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
Iterator<TaskMessage> iter = getBundleIterator(bundle);
if (null != iter && iter.hasNext()) {
connection.send(iter);
}
}
- }
+ }
}
-
+
+ private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) {
+ HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
+ for (Integer task : this.bundles.keySet()) {
+ String hostPort = taskToNode.get(task);
+ if (hostPort != null) {
+ for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
+ addListRefToMap(bundleMap, hostPort, chunk);
+ }
+ }
+ }
+ return bundleMap;
+ }
+
+ private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundleMap,
+ T key, ArrayList<TaskMessage> tuples) {
+ ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);
+
+ if (null == bundle) {
+ bundle = new ArrayList<ArrayList<TaskMessage>>();
+ bundleMap.put(key, bundle);
+ }
+
+ if (null != tuples && tuples.size() > 0) {
+ bundle.add(tuples);
+ }
+ }
+
private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
if (null == bundle) {
[2/2] storm git commit: add STORM-737 to changelog
Posted by pt...@apache.org.
add STORM-737 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e5e5f0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e5e5f0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e5e5f0e
Branch: refs/heads/0.10.x-branch
Commit: 7e5e5f0e38069a069d560878aeb88e1a5f8015f8
Parents: acc1a79
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 14:00:56 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 14:00:56 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7e5e5f0e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 50dcb2d..b0b8d0e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.10.0
+ * STORM-737] Check task->node+port with read lock to prevent sending to closed connection
* STORM-835 Netty Client hold batch object until io operation complete
* STORM-827: Allow AutoTGT to work with storm-hdfs too.
* STORM-728: Put emitted and transferred stats under correct columns