You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2013/12/07 07:10:02 UTC

[06/12] git commit: Ensure we don't overflow the backoff value.

Ensure we don't overflow the backoff value.

The first attempt to fix this (213102b36f890) did not correctly address
the issue.  The 32 bit signed integer frequently overflows, resulting in
a bad value for Random.nextInt().

The default for storm.messaging.netty.max_retries is now 30 (instead of
100), and there is an upper limit of 30 for max_retries.

I also did a whitespace cleanup.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c638db0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c638db0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c638db0e

Branch: refs/heads/master
Commit: c638db0e88e3c56f808c8a76a88f94d7bf1988c4
Parents: 4e19589
Author: Brenden Matthews <br...@diddyinc.com>
Authored: Wed Oct 30 09:41:13 2013 -0700
Committer: Brenden Matthews <br...@diddyinc.com>
Committed: Wed Dec 4 14:30:52 2013 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |  2 +-
 .../backtype/storm/messaging/netty/Client.java  | 58 ++++++++++----------
 2 files changed, 29 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c638db0e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 08c7889..a5b31f4 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -86,7 +86,7 @@ zmq.hwm: 0
 storm.messaging.netty.server_worker_threads: 1
 storm.messaging.netty.client_worker_threads: 1
 storm.messaging.netty.buffer_size: 5242880 #5MB buffer
-storm.messaging.netty.max_retries: 100
+storm.messaging.netty.max_retries: 30
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c638db0e/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
index 00431d4..91e4bd4 100644
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -23,14 +23,14 @@ import backtype.storm.utils.Utils;
 
 class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private final int max_retries; 
-    private final int base_sleep_ms; 
-    private final int max_sleep_ms; 
+    private final int max_retries;
+    private final int base_sleep_ms;
+    private final int max_sleep_ms;
     private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
     private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
     private InetSocketAddress remote_addr;
-    private AtomicInteger retries; 
+    private AtomicInteger retries;
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
@@ -38,14 +38,14 @@ class Client implements IConnection {
 
     @SuppressWarnings("rawtypes")
     Client(Map storm_conf, String host, int port) {
-        message_queue = new LinkedBlockingQueue<Object>(); 
+        message_queue = new LinkedBlockingQueue<Object>();
         retries = new AtomicInteger(0);
         channelRef = new AtomicReference<Channel>(null);
         being_closed = new AtomicBoolean(false);
 
-        // Configure 
+        // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
         base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -74,9 +74,9 @@ class Client implements IConnection {
     void reconnect() {
         try {
             int tried_count = retries.incrementAndGet();
-            if (tried_count < max_retries) {
+            if (tried_count <= max_retries) {
                 Thread.sleep(getSleepTimeMs());
-                LOG.info("Reconnect ... [{}]", tried_count);   
+                LOG.info("Reconnect ... [{}]", tried_count);
                 bootstrap.connect(remote_addr);
                 LOG.debug("connection started...");
             } else {
@@ -85,7 +85,7 @@ class Client implements IConnection {
             }
         } catch (InterruptedException e) {
             LOG.warn("connection failed", e);
-        } 
+        }
     }
 
     /**
@@ -93,19 +93,17 @@ class Client implements IConnection {
      */
     private int getSleepTimeMs()
     {
-        int backoff = 1 << Math.max(1, retries.get());
+        int backoff = 1 << retries.get();
         int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
         if ( sleepMs > max_sleep_ms )
             sleepMs = max_sleep_ms;
-        if ( sleepMs < base_sleep_ms )
-          sleepMs = base_sleep_ms;
         return sleepMs;
     }
 
     /**
-     * Enqueue a task message to be sent to server 
+     * Enqueue a task message to be sent to server
      */
-    public void send(int task, byte[] message) {        
+    public void send(int task, byte[] message) {
         //throw exception if the client is being closed
         if (being_closed.get()) {
             throw new RuntimeException("Client is being closed, and does not take requests any more");
@@ -128,43 +126,43 @@ class Client implements IConnection {
         MessageBatch batch = new MessageBatch(buffer_size);
         Object msg = message_queue.take();
         batch.add(msg);
-        
+
         //we will discard any message after CLOSE
-        if (msg==ControlMessage.CLOSE_MESSAGE) 
+        if (msg==ControlMessage.CLOSE_MESSAGE)
             return batch;
-        
+
         while (!batch.isFull()) {
             //peek the next message
             msg = message_queue.peek();
             //no more messages
             if (msg == null) break;
-            
+
             //we will discard any message after CLOSE
             if (msg==ControlMessage.CLOSE_MESSAGE) {
                 message_queue.take();
                 batch.add(msg);
                 break;
             }
-            
+
             //try to add this msg into batch
             if (!batch.tryAdd((TaskMessage) msg))
                 break;
-            
+
             //remove this message
             message_queue.take();
         }
 
         return batch;
     }
-    
+
     /**
      * gracefully close this client.
-     * 
+     *
      * We will send all existing requests, and then invoke close_n_release() method
      */
     public synchronized void close() {
-        if (!being_closed.get()) {  
-            //enqueue a CLOSE message so that shutdown() will be invoked 
+        if (!being_closed.get()) {
+            //enqueue a CLOSE message so that shutdown() will be invoked
             try {
                 message_queue.put(ControlMessage.CLOSE_MESSAGE);
                 being_closed.set(true);
@@ -178,10 +176,10 @@ class Client implements IConnection {
      * close_n_release() is invoked after all messages have been sent.
      */
     void  close_n_release() {
-        if (channelRef.get() != null) 
+        if (channelRef.get() != null)
             channelRef.get().close().awaitUninterruptibly();
 
-        //we need to release resources 
+        //we need to release resources
         new Thread(new Runnable() {
             @Override
             public void run() {
@@ -194,10 +192,10 @@ class Client implements IConnection {
     }
 
     void setChannel(Channel channel) {
-        channelRef.set(channel); 
-        //reset retries   
+        channelRef.set(channel);
+        //reset retries
         if (channel != null)
-            retries.set(0); 
+            retries.set(0);
     }
 
 }