You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/09 15:48:10 UTC

[05/32] git commit: STORM-297, add more comments

STORM-297, add more comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c5e7a0d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c5e7a0d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c5e7a0d0

Branch: refs/heads/master
Commit: c5e7a0d0f137cc5f0ccf6116541c16d4a7727598
Parents: 2812e51
Author: Sean Zhong <cl...@gmail.com>
Authored: Thu May 8 10:32:46 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Thu May 8 10:32:46 2014 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                               |  9 ++++++++-
 .../backtype/storm/messaging/netty/Client.java   |  4 +++-
 .../storm/messaging/netty/MessageDecoder.java    |  1 +
 .../backtype/storm/messaging/netty/Server.java   |  7 +++++++
 .../src/jvm/backtype/storm/utils/Utils.java      | 19 -------------------
 5 files changed, 19 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4c15fce..cd40f5d 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -93,6 +93,8 @@ supervisor.enable: true
 ### worker.* configs are for task workers
 worker.childopts: "-Xmx768m"
 worker.heartbeat.frequency.secs: 1
+
+# control how many worker receiver threads we need per worker 
 worker.receiver.thread.count: 1
 
 task.heartbeat.frequency.secs: 3
@@ -109,10 +111,15 @@ storm.messaging.netty.buffer_size: 5242880 #5MB buffer
 storm.messaging.netty.max_retries: 30
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
+
+# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency.
 storm.messaging.netty.transfer.batch.size: 262144
+
+# If storm.messaging.netty.blocking is set to true, the Netty Client will send messages in synchronized way, otherwise it will do it in async way. Set storm.messaging.netty.blocking to false to improve the latency and throughput.
 storm.messaging.netty.blocking: false
-storm.messaging.netty.flush.check.interval.ms: 10
 
+# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is.
+storm.messaging.netty.flush.check.interval.ms: 10
 
 ### topology.* configs are for specific executing storms
 topology.enable.message.timeouts: true

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 46fd47a..85a904c 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -117,7 +117,7 @@ public class Client implements IConnection {
                 }
                 
             }
-        }, "netty-client-flush-checker");
+        }, name() + "-flush-checker");
         
         flushChecker.setDaemon(true);
         flushChecker.start();
@@ -224,6 +224,8 @@ public class Client implements IConnection {
                 flushRequest(channel, toBeFlushed, blocking);
                 
             } else {
+                // when channel is NOT writable, it means the internal netty buffer is full. 
+                // In this case, we can try to buffer up more incoming messages.
                 flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 8291d78..72c3cf7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -45,6 +45,7 @@ public class MessageDecoder extends FrameDecoder {
 
         List<Object> ret = new ArrayList<Object>();
 
+        // Use while loop, try to decode as more messages as possible in single call
         while (available >= 2) {
 
             // Mark the current buffer position before reading task/len field

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 71f01e0..f51af75 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -46,7 +46,11 @@ class Server implements IConnection {
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
+    
+    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
+    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
     private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
+    
     volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
@@ -126,12 +130,15 @@ class Server implements IConnection {
     }
     
     private Integer getMessageQueueId(int task) {
+      // try to construct the map from taskId -> queueId in round robin manner.
+      
       Integer queueId = taskToQueueId.get(task);
       if (null == queueId) {
         synchronized(taskToQueueId) {
           //assgin task to queue in round-robin manner
           if (null == taskToQueueId.get(task)) {
             queueId = roundRobinQueueId++;
+            
             taskToQueueId.put(task, queueId);
             if (roundRobinQueueId == queueCount) {
               roundRobinQueueId = 0;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c5e7a0d0/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index b1892f1..6a0a447 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -400,25 +400,6 @@ public class Utils {
         return ret;
     }
     
-    public static void redirectStreamAsync(Process process) {
-      redirectStreamAsync(process.getInputStream(), System.out);
-      redirectStreamAsync(process.getErrorStream(), System.err);
-    }
-    
-    static void redirectStreamAsync(final InputStream input,
-        final PrintStream output) {
-      new Thread(new Runnable() {
-        @Override
-        public void run() {
-          Scanner scanner = new Scanner(input);
-          while (scanner.hasNextLine()) {
-            output.println(scanner.nextLine());
-          }
-        }
-      }).start();
-    }
- 
-
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
         CuratorFramework ret = newCurator(conf, servers, port);
         ret.start();