You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/31 19:08:15 UTC

storm git commit: Merge branch 'fix' of https://github.com/pczb/storm into STORM-3055-1.x

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 554ff4a0f -> aeb69dd13


Merge branch 'fix' of https://github.com/pczb/storm into STORM-3055-1.x

STORM-3055: remove context connection cache


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aeb69dd1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aeb69dd1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aeb69dd1

Branch: refs/heads/1.x-branch
Commit: aeb69dd13791ef238c9a3a2d2ef660106f7f21cd
Parents: 554ff4a
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu May 31 13:18:07 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu May 31 13:35:17 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/messaging/IContext.java    |  2 +
 .../apache/storm/messaging/netty/Client.java    | 12 +-----
 .../apache/storm/messaging/netty/Context.java   | 40 ++++++--------------
 3 files changed, 16 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aeb69dd1/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/IContext.java b/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
index 72812b1..595568a 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
@@ -50,6 +50,8 @@ public interface IContext {
     
     /**
      * This method establish a client side connection to a remote server
+     * implementation should return a new connection every call
+     *
      * @param storm_id topology ID
      * @param host remote host
      * @param port remote port

http://git-wip-us.apache.org/repos/asf/storm/blob/aeb69dd1/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
index b72b2bd..b665f61 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -81,10 +81,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private final ClientBootstrap bootstrap;
     private final InetSocketAddress dstAddress;
     protected final String dstAddressPrefixedName;
-    //The actual name of the host we are trying to connect to so that
-    // when we remove ourselves from the connection cache there is no concern that
-    // the resolved host name is different.
-    private final String dstHost;
+
     private volatile Map<Integer, Double> serverLoad = null;
 
     /**
@@ -133,8 +130,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      */
     private volatile boolean closing = false;
 
-    private final Context context;
-
     private final HashedWheelTimer scheduler;
 
     private final MessageBuffer batcher;
@@ -142,11 +137,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private final Object writeLock = new Object();
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
+    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
         this.stormConf = stormConf;
         closing = false;
         this.scheduler = scheduler;
-        this.context = context;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
         saslChannelReady.set(!Utils.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
@@ -160,7 +154,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
         // Initiate connection to remote destination
         bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
-        dstHost = host;
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         launchChannelAliveThread();
@@ -425,7 +418,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
-            context.removeClient(dstHost, dstAddress.getPort());
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
             waitForPendingMessagesToBeSent();

http://git-wip-us.apache.org/repos/asf/storm/blob/aeb69dd1/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
index 3b65eb5..dae8e85 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -19,9 +19,11 @@ package org.apache.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.storm.Config;
@@ -32,7 +34,7 @@ import org.apache.storm.utils.Utils;
 public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
-    private Map<String, IConnection> connections;
+    private List<Server> serverConnections;
     private NioClientSocketChannelFactory clientChannelFactory;
     
     private HashedWheelTimer clientScheduleService;
@@ -43,7 +45,7 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
-        connections = new HashMap<>();
+        serverConnections = new ArrayList<>();
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -64,30 +66,17 @@ public class Context implements IContext {
      * establish a server with a binding port
      */
     public synchronized IConnection bind(String storm_id, int port) {
-        IConnection server = new Server(storm_conf, port);
-        connections.put(key(storm_id, port), server);
+        Server server = new Server(storm_conf, port);
+        serverConnections.add(server);
         return server;
     }
 
     /**
      * establish a connection to a remote server
      */
-    public synchronized IConnection connect(String storm_id, String host, int port) {
-        IConnection connection = connections.get(key(host,port));
-        if(connection !=null)
-        {
-            return connection;
-        }
-        IConnection client =  new Client(storm_conf, clientChannelFactory, 
-                clientScheduleService, host, port, this);
-        connections.put(key(host, port), client);
-        return client;
-    }
-
-    synchronized void removeClient(String host, int port) {
-        if (connections != null) {
-            connections.remove(key(host, port));
-        }
+    public IConnection connect(String storm_id, String host, int port) {
+        return new Client(storm_conf, clientChannelFactory,
+                clientScheduleService, host, port);
     }
 
     /**
@@ -96,18 +85,13 @@ public class Context implements IContext {
     public synchronized void term() {
         clientScheduleService.stop();
 
-        for (IConnection conn : connections.values()) {
+        for (Server conn : serverConnections) {
             conn.close();
         }
-
-        connections = null;
+        serverConnections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
 
     }
-
-    private String key(String host, int port) {
-        return String.format("%s:%d", host, port);
-    }
 }