You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/31 18:17:11 UTC

[1/2] storm git commit: remove context connection & add context connect specification

Repository: storm
Updated Branches:
  refs/heads/master 4964112d9 -> 3356f1f76


remove context connection & add context connect specification

N/A

Signed-off-by: pczb <pc...@163.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a4f1d0f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a4f1d0f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a4f1d0f

Branch: refs/heads/master
Commit: 7a4f1d0ff8f49243f8ca7440ed29d6c10c136269
Parents: 290458c
Author: pczb <pc...@163.com>
Authored: Thu May 10 01:41:18 2018 +0800
Committer: pczb <pc...@163.com>
Committed: Thu May 17 23:02:02 2018 +0800

----------------------------------------------------------------------
 .../org/apache/storm/messaging/IContext.java    |  1 +
 .../apache/storm/messaging/netty/Client.java    | 11 ++----
 .../apache/storm/messaging/netty/Context.java   | 38 ++++++--------------
 3 files changed, 14 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7a4f1d0f/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index c2c9c8a..8d2c0dc 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -47,6 +47,7 @@ public interface IContext {
 
     /**
      * This method establish a client side connection to a remote server
+     * implementation should return a new connection every call
      *
      * @param storm_id       topology ID
      * @param host           remote host

http://git-wip-us.apache.org/repos/asf/storm/blob/7a4f1d0f/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 2736e2f..3cdde87 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -78,10 +78,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private final StormBoundedExponentialBackoffRetry retryPolicy;
     private final ClientBootstrap bootstrap;
     private final InetSocketAddress dstAddress;
-    //The actual name of the host we are trying to connect to so that
-    // when we remove ourselves from the connection cache there is no concern that
-    // the resolved host name is different.
-    private final String dstHost;
+
     /**
      * The channel used for all write operations from this client to the remote destination.
      */
@@ -114,7 +111,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
      * Whether the SASL channel is ready.
      */
     private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
-    private final Context context;
     private final HashedWheelTimer scheduler;
     private final MessageBuffer batcher;
     // wait strategy when the netty channel is not writable
@@ -128,11 +124,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private volatile boolean closing = false;
 
     Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, ChannelFactory factory, HashedWheelTimer scheduler, String host,
-           int port, Context context) {
+           int port) {
         this.topoConf = topoConf;
         closing = false;
         this.scheduler = scheduler;
-        this.context = context;
         int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
         int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
@@ -149,7 +144,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
 
         // Initiate connection to remote destination
         bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus);
-        dstHost = host;
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         launchChannelAliveThread();
@@ -451,7 +445,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     public void close() {
         if (!closing) {
             LOG.info("closing Netty Client {}", dstAddressPrefixedName);
-            context.removeClient(dstHost, dstAddress.getPort());
             // Set closing to true to prevent any further reconnection attempts.
             closing = true;
             waitForPendingMessagesToBeSent();

http://git-wip-us.apache.org/repos/asf/storm/blob/7a4f1d0f/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 1816548..3614a94 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -12,7 +12,8 @@
 
 package org.apache.storm.messaging.netty;
 
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -26,7 +27,7 @@ import org.jboss.netty.util.HashedWheelTimer;
 
 public class Context implements IContext {
     private Map<String, Object> topoConf;
-    private Map<String, IConnection> connections;
+    private List<Server> serverConnections;
     private NioClientSocketChannelFactory clientChannelFactory;
     private HashedWheelTimer clientScheduleService;
 
@@ -35,7 +36,7 @@ public class Context implements IContext {
      */
     public void prepare(Map<String, Object> topoConf) {
         this.topoConf = topoConf;
-        connections = new HashMap<>();
+        serverConnections = new ArrayList<>();
 
         //each context will have a single client channel factory
         int maxWorkers = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -56,29 +57,17 @@ public class Context implements IContext {
      * establish a server with a binding port
      */
     public synchronized IConnection bind(String storm_id, int port) {
-        IConnection server = new Server(topoConf, port);
-        connections.put(key(storm_id, server.getPort()), server);
+        Server server = new Server(topoConf, port);
+        serverConnections.add(server);
         return server;
     }
 
     /**
      * establish a connection to a remote server
      */
-    public synchronized IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
-        IConnection connection = connections.get(key(host, port));
-        if (connection != null) {
-            return connection;
-        }
-        IConnection client = new Client(topoConf, remoteBpStatus, clientChannelFactory,
-                                        clientScheduleService, host, port, this);
-        connections.put(key(host, client.getPort()), client);
-        return client;
-    }
-
-    synchronized void removeClient(String host, int port) {
-        if (connections != null) {
-            connections.remove(key(host, port));
-        }
+    public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
+        return new Client(topoConf, remoteBpStatus, clientChannelFactory,
+                                        clientScheduleService, host, port);
     }
 
     /**
@@ -87,18 +76,13 @@ public class Context implements IContext {
     public synchronized void term() {
         clientScheduleService.stop();
 
-        for (IConnection conn : connections.values()) {
+        for (Server conn : serverConnections) {
             conn.close();
         }
-
-        connections = null;
+        serverConnections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
 
     }
-
-    private String key(String host, int port) {
-        return String.format("%s:%d", host, port);
-    }
 }


[2/2] storm git commit: Merge branch 'dev' of https://github.com/pczb/storm into STORM-3055

Posted by bo...@apache.org.
Merge branch 'dev' of https://github.com/pczb/storm into STORM-3055

STORM-3055: remove context connection cache


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3356f1f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3356f1f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3356f1f7

Branch: refs/heads/master
Commit: 3356f1f764135570b948f6614c4816f49fd68dbd
Parents: 4964112 7a4f1d0
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu May 31 12:56:10 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu May 31 12:56:10 2018 -0500

----------------------------------------------------------------------
 .../org/apache/storm/messaging/IContext.java    |  1 +
 .../apache/storm/messaging/netty/Client.java    | 11 ++----
 .../apache/storm/messaging/netty/Context.java   | 38 ++++++--------------
 3 files changed, 14 insertions(+), 36 deletions(-)
----------------------------------------------------------------------