You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/09 15:48:13 UTC
[08/32] git commit: STORM-297: Use recv(int flags,
int clientId) to replace recv(int flags) in IConnection interface
STORM-297: Use recv(int flags, int clientId) to replace recv(int flags) in IConnection interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c5c3571c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c5c3571c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c5c3571c
Branch: refs/heads/master
Commit: c5c3571ca15ee2dd675fb3cac44bd0f926ccfc67
Parents: 138a7a7
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 10:56:26 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 10:56:26 2014 +0800
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/messaging/local.clj | 10 ----------
.../src/jvm/backtype/storm/messaging/IConnection.java | 6 ------
.../src/jvm/backtype/storm/messaging/netty/Client.java | 5 +----
.../src/jvm/backtype/storm/messaging/netty/Server.java | 12 +-----------
.../clj/backtype/storm/messaging/netty_unit_test.clj | 8 ++++----
5 files changed, 6 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index de14806..801f22d 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -31,16 +31,6 @@
(deftype LocalConnection [storm-id port queues-map lock queue]
IConnection
- (^Iterator recv [this ^int flags]
- (when-not queue
- (throw (IllegalArgumentException. "Cannot receive on this socket")))
- (let [ret (ArrayList.)
- msg (if (= flags 1) (.poll queue) (.take queue))]
- (if msg
- (do
- (.add ret msg)
- (.iterator ret))
- nil)))
(^Iterator recv [this ^int flags ^int clientId]
(when-not queue
(throw (IllegalArgumentException. "Cannot receive on this socket")))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index fe9caa7..ead4935 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -20,12 +20,6 @@ package backtype.storm.messaging;
import java.util.Iterator;
public interface IConnection {
- /**
- * receive a batch message iterator (consists taskId and payload)
- * @param flags 0: block, 1: non-block
- * @return
- */
- public Iterator<TaskMessage> recv(int flags);
/**
* receive a batch message iterator (consists taskId and payload)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 85a904c..8f0d7af 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -294,10 +294,7 @@ public class Client implements IConnection {
}
}
- public Iterator<TaskMessage> recv(int flags) {
- throw new RuntimeException("Client connection should not receive any messages");
- }
-
+ @Override
public Iterator<TaskMessage> recv(int flags, int clientId) {
throw new RuntimeException("Client connection should not receive any messages");
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index d551f02..20a147d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -172,16 +172,6 @@ class Server implements IConnection {
message_queue[receiverId].put(msgGroup);
}
}
- }
-
- /**
- * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
- */
- public Iterator<TaskMessage> recv(int flags) {
- if (queueCount > 1) {
- throw new RuntimeException("Use recv(int flags, int clientId) instead, as we have worker.receiver.thread.count=" + queueCount + " receive threads, clientId should be 0 <= clientId < " + queueCount);
- }
- return recv(flags, 0);
}
public Iterator<TaskMessage> recv(int flags, int receiverId) {
@@ -210,7 +200,7 @@ class Server implements IConnection {
}
return null;
}
-
+
/**
* register a newly created channel
* @param channel
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index d76e245..ea7b8dc 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -37,7 +37,7 @@
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
- iter (.recv server 0)
+ iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
@@ -59,7 +59,7 @@
server (.bind context nil port)
client (.connect context nil "localhost" port)
_ (.send client task (.getBytes req_msg))
- iter (.recv server 0)
+ iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
@@ -84,7 +84,7 @@
(fn []
(Thread/sleep 1000)
(let [server (.bind context nil port)
- iter (.recv server 0)
+ iter (.recv server 0 0)
resp (.next iter)]
(is (= task (.task resp)))
(is (= req_msg (String. (.message resp))))
@@ -116,7 +116,7 @@
(let [resp (ArrayList.)
received (atom 0)]
(while (< @received (- 100000 1))
- (let [iter (.recv server 0)]
+ (let [iter (.recv server 0 0)]
(while (.hasNext iter)
(let [msg (.next iter)]
(.add resp msg)