You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/09 15:48:13 UTC

[08/32] git commit: STORM-297: Use recv(int flags, int clientId) to replace recv(int flags) in IConnection interface

STORM-297: Use recv(int flags, int clientId) to replace recv(int flags) in IConnection interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c5c3571c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c5c3571c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c5c3571c

Branch: refs/heads/master
Commit: c5c3571ca15ee2dd675fb3cac44bd0f926ccfc67
Parents: 138a7a7
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue May 20 10:56:26 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue May 20 10:56:26 2014 +0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/messaging/local.clj   | 10 ----------
 .../src/jvm/backtype/storm/messaging/IConnection.java   |  6 ------
 .../src/jvm/backtype/storm/messaging/netty/Client.java  |  5 +----
 .../src/jvm/backtype/storm/messaging/netty/Server.java  | 12 +-----------
 .../clj/backtype/storm/messaging/netty_unit_test.clj    |  8 ++++----
 5 files changed, 6 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index de14806..801f22d 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -31,16 +31,6 @@
 
 (deftype LocalConnection [storm-id port queues-map lock queue]
   IConnection
-  (^Iterator recv [this ^int flags]
-    (when-not queue
-      (throw (IllegalArgumentException. "Cannot receive on this socket")))
-    (let [ret (ArrayList.)
-          msg (if (= flags 1) (.poll queue) (.take queue))]
-      (if msg
-        (do 
-          (.add ret msg)
-          (.iterator ret))
-        nil)))
   (^Iterator recv [this ^int flags ^int clientId]
     (when-not queue
       (throw (IllegalArgumentException. "Cannot receive on this socket")))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index fe9caa7..ead4935 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -20,12 +20,6 @@ package backtype.storm.messaging;
 import java.util.Iterator;
 
 public interface IConnection {   
-    /**
-     * receive a batch message iterator (consists taskId and payload)
-     * @param flags 0: block, 1: non-block
-     * @return
-     */
-    public Iterator<TaskMessage> recv(int flags);
     
     /**
      * receive a batch message iterator (consists taskId and payload)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 85a904c..8f0d7af 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -294,10 +294,7 @@ public class Client implements IConnection {
         }
     }
 
-    public Iterator<TaskMessage> recv(int flags) {
-        throw new RuntimeException("Client connection should not receive any messages");
-    }
-
+    @Override
     public Iterator<TaskMessage> recv(int flags, int clientId) {
         throw new RuntimeException("Client connection should not receive any messages");
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index d551f02..20a147d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -172,16 +172,6 @@ class Server implements IConnection {
           message_queue[receiverId].put(msgGroup);
         }
       }
-   }
-    
-    /**
-     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
-     */
-    public Iterator<TaskMessage> recv(int flags)  {
-      if (queueCount > 1) {
-        throw new RuntimeException("Use recv(int flags, int clientId) instead, as we have worker.receiver.thread.count=" + queueCount + " receive threads, clientId should be 0 <= clientId < " + queueCount);
-      }
-      return recv(flags, 0);
     }
     
     public Iterator<TaskMessage> recv(int flags, int receiverId)  {
@@ -210,7 +200,7 @@ class Server implements IConnection {
       }
       return null;
     }
-
+   
     /**
      * register a newly created channel
      * @param channel

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5c3571c/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index d76e245..ea7b8dc 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -37,7 +37,7 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0)
+        iter (.recv server 0 0)
         resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
@@ -59,7 +59,7 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        iter (.recv server 0)
+        iter (.recv server 0 0)
         resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
@@ -84,7 +84,7 @@
                 (fn []
                   (Thread/sleep 1000)
                   (let [server (.bind context nil port)
-                        iter (.recv server 0)
+                        iter (.recv server 0 0)
                         resp (.next iter)]
                     (is (= task (.task resp)))
                     (is (= req_msg (String. (.message resp))))
@@ -116,7 +116,7 @@
     (let [resp (ArrayList.)
           received (atom 0)]
       (while (< @received (- 100000 1))
-        (let [iter (.recv server 0)]
+        (let [iter (.recv server 0 0)]
           (while (.hasNext iter)
             (let [msg (.next iter)]
               (.add resp msg)