You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/07/14 21:06:46 UTC

[15/23] storm git commit: Abandone the idea of scheduling timeouts as needed as it's still performing worse than the original version

Abandone the idea of scheduling timeouts as needed as it's still performing worse than the original version


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afa638cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afa638cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afa638cd

Branch: refs/heads/0.9.x-branch
Commit: afa638cdbf2440d88af3775c0f02b4dc792922b0
Parents: 832b5db
Author: Enno Shioji <es...@gmail.com>
Authored: Wed Jun 3 00:26:53 2015 +0100
Committer: Enno Shioji <es...@gmail.com>
Committed: Wed Jun 3 00:26:53 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 127 ++++++++++---------
 .../backtype/storm/messaging/netty/Context.java |  32 +++--
 2 files changed, 85 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/afa638cd/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 0d75448..3652886 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -23,6 +23,7 @@ import backtype.storm.messaging.TaskMessage;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
+import com.google.common.base.Throwables;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -41,6 +42,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -120,14 +122,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      */
     private final int messageBatchSize;
 
-    private final HashedWheelTimer scheduler;
+    private final ScheduledExecutorService scheduler;
 
     private final Object pendingMessageLock = new Object();
     private MessageBatch pendingMessage;
     private Timeout pendingFlush;
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) {
         closing = false;
         this.scheduler = scheduler;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -147,7 +149,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
 
         // Dummy values to avoid null checks
         pendingMessage = new MessageBatch(messageBatchSize);
-        pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
+        scheduler.scheduleWithFixedDelay(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
@@ -489,37 +491,33 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      * This task runs on a single thread shared among all clients, and thus
      * should not perform operations that block or are expensive.
      */
-    private class Flush implements TimerTask {
-        private final MessageBatch instructor;
-
-        private Flush(MessageBatch instructor) {
-            this.instructor = instructor;
-        }
-
+    private class Flush implements Runnable {
         @Override
-        public void run(Timeout timeout) throws Exception {
-            MessageBatch toSend;
-            MessageBatch replacement = new MessageBatch(messageBatchSize);
-            synchronized (pendingMessageLock){
-                if(instructor == pendingMessage){
-                    // It's still the batch which scheduled this timeout
-                    toSend = pendingMessage;
-                    pendingMessage = replacement;
-                    checkState(!toSend.isFull(), "Only unfilled batches should get timeouts scheduled");
-                } else {
-                    // It's no longer the batch which scheduled this timeout
-                    // No need to work on this one
-                    toSend = null;
-                }
-            }
-
-            if(toSend!=null){
+        public void run() {
+            try {
                 Channel channel = getConnectedChannel();
-                if(channel == null) {
-                    dropMessages(toSend);
+                if (channel == null || !channel.isWritable()) {
+                    // Connection not available or buffer is full, no point in flushing
+                    return;
                 } else {
+                    // Connection is available and there is room in Netty's buffer
+                    MessageBatch toSend;
+                    synchronized (pendingMessageLock) {
+                        if(pendingMessage.isEmpty()){
+                            // Nothing to flush
+                            return;
+                        } else {
+                            toSend = pendingMessage;
+                            pendingMessage = new MessageBatch(messageBatchSize);
+                        }
+                    }
+                    checkState(!toSend.isFull(), "Filled batches should never be in pendingMessage field");
+
                     flushMessages(channel, toSend);
                 }
+            }catch (Throwable e){
+                LOG.error("Uncaught throwable", e);
+                throw Throwables.propagate(e);
             }
         }
     }
@@ -529,7 +527,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      * This task runs on a single thread shared among all clients, and thus
      * should not perform operations that block.
      */
-    private class Connect implements TimerTask {
+    private class Connect implements Runnable {
 
         private final InetSocketAddress address;
 
@@ -548,41 +546,46 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
 
 
         @Override
-        public void run(Timeout timeout) throws Exception {
-            if (reconnectingAllowed()) {
-                final int connectionAttempt = connectionAttempts.getAndIncrement();
-                totalConnectionAttempts.getAndIncrement();
-
-                LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
-                ChannelFuture future = bootstrap.connect(address);
-                future.addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        // This call returns immediately
-                        Channel newChannel = future.getChannel();
-
-                        if (future.isSuccess() && connectionEstablished(newChannel)) {
-                            boolean setChannel = channelRef.compareAndSet(null, newChannel);
-                            checkState(setChannel);
-                            LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
-                                    connectionAttempt);
-                            if (messagesLost.get() > 0) {
-                                LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
-                            }
-                        } else {
-                            Throwable cause = future.getCause();
-                            reschedule(cause);
-                            if (newChannel != null) {
-                                newChannel.close();
+        public void run() {
+            try {
+                if (reconnectingAllowed()) {
+                    final int connectionAttempt = connectionAttempts.getAndIncrement();
+                    totalConnectionAttempts.getAndIncrement();
+
+                    LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+                    ChannelFuture future = bootstrap.connect(address);
+                    future.addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            // This call returns immediately
+                            Channel newChannel = future.getChannel();
+
+                            if (future.isSuccess() && connectionEstablished(newChannel)) {
+                                boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                                checkState(setChannel);
+                                LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
+                                        connectionAttempt);
+                                if (messagesLost.get() > 0) {
+                                    LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
+                                }
+                            } else {
+                                Throwable cause = future.getCause();
+                                reschedule(cause);
+                                if (newChannel != null) {
+                                    newChannel.close();
+                                }
                             }
                         }
-                    }
-                });
-            } else {
-                close();
-                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
-                        connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+                    });
+                } else {
+                    close();
+                    throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
+                            connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
 
+                }
+            }catch (Throwable e){
+                LOG.error("Uncaught throwable", e);
+                throw Throwables.propagate(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/afa638cd/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 64f67ba..7e0cb0d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,7 +18,6 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
@@ -35,13 +34,14 @@ import backtype.storm.utils.Utils;
 
 public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-        
+
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
-    
-    private HashedWheelTimer clientScheduleService;
+
+    private ScheduledExecutorService clientScheduleService;
+    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
     /**
      * initialization per Storm configuration 
@@ -53,7 +53,7 @@ public class Context implements IContext {
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+        ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
         if (maxWorkers > 0) {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
@@ -62,8 +62,10 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
-        
-        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"), 10, TimeUnit.MILLISECONDS);
+
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
+        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -78,8 +80,8 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
-    public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+    public IConnection connect(String storm_id, String host, int port) {
+        IConnection client =  new Client(storm_conf, clientChannelFactory,
                 clientScheduleService, host, port);
         connections.add(client);
         return client;
@@ -89,12 +91,18 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
-        clientScheduleService.stop();
-        
+        clientScheduleService.shutdown();
+
         for (IConnection conn : connections) {
             conn.close();
         }
-        
+
+        try {
+            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Error when shutting down client scheduler", e);
+        }
+
         connections = null;
 
         //we need to release resources associated with client channel factory