You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/31 19:08:15 UTC
storm git commit: Merge branch 'fix' of https://github.com/pczb/storm
into STORM-3055-1.x
Repository: storm
Updated Branches:
refs/heads/1.x-branch 554ff4a0f -> aeb69dd13
Merge branch 'fix' of https://github.com/pczb/storm into STORM-3055-1.x
STORM-3055: remove context connection cache
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aeb69dd1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aeb69dd1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aeb69dd1
Branch: refs/heads/1.x-branch
Commit: aeb69dd13791ef238c9a3a2d2ef660106f7f21cd
Parents: 554ff4a
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu May 31 13:18:07 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu May 31 13:35:17 2018 -0500
----------------------------------------------------------------------
.../org/apache/storm/messaging/IContext.java | 2 +
.../apache/storm/messaging/netty/Client.java | 12 +-----
.../apache/storm/messaging/netty/Context.java | 40 ++++++--------------
3 files changed, 16 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/aeb69dd1/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/IContext.java b/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
index 72812b1..595568a 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/IContext.java
@@ -50,6 +50,8 @@ public interface IContext {
/**
* This method establish a client side connection to a remote server
+ * implementation should return a new connection every call
+ *
* @param storm_id topology ID
* @param host remote host
* @param port remote port
http://git-wip-us.apache.org/repos/asf/storm/blob/aeb69dd1/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
index b72b2bd..b665f61 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -81,10 +81,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final ClientBootstrap bootstrap;
private final InetSocketAddress dstAddress;
protected final String dstAddressPrefixedName;
- //The actual name of the host we are trying to connect to so that
- // when we remove ourselves from the connection cache there is no concern that
- // the resolved host name is different.
- private final String dstHost;
+
private volatile Map<Integer, Double> serverLoad = null;
/**
@@ -133,8 +130,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
*/
private volatile boolean closing = false;
- private final Context context;
-
private final HashedWheelTimer scheduler;
private final MessageBuffer batcher;
@@ -142,11 +137,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final Object writeLock = new Object();
@SuppressWarnings("rawtypes")
- Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
+ Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port) {
this.stormConf = stormConf;
closing = false;
this.scheduler = scheduler;
- this.context = context;
int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
// if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
saslChannelReady.set(!Utils.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
@@ -160,7 +154,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
// Initiate connection to remote destination
bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
- dstHost = host;
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
launchChannelAliveThread();
@@ -425,7 +418,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
- context.removeClient(dstHost, dstAddress.getPort());
// Set closing to true to prevent any further reconnection attempts.
closing = true;
waitForPendingMessagesToBeSent();
http://git-wip-us.apache.org/repos/asf/storm/blob/aeb69dd1/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
index 3b65eb5..dae8e85 100644
--- a/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -19,9 +19,11 @@ package org.apache.storm.messaging.netty;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
@@ -32,7 +34,7 @@ import org.apache.storm.utils.Utils;
public class Context implements IContext {
@SuppressWarnings("rawtypes")
private Map storm_conf;
- private Map<String, IConnection> connections;
+ private List<Server> serverConnections;
private NioClientSocketChannelFactory clientChannelFactory;
private HashedWheelTimer clientScheduleService;
@@ -43,7 +45,7 @@ public class Context implements IContext {
@SuppressWarnings("rawtypes")
public void prepare(Map storm_conf) {
this.storm_conf = storm_conf;
- connections = new HashMap<>();
+ serverConnections = new ArrayList<>();
//each context will have a single client channel factory
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -64,30 +66,17 @@ public class Context implements IContext {
* establish a server with a binding port
*/
public synchronized IConnection bind(String storm_id, int port) {
- IConnection server = new Server(storm_conf, port);
- connections.put(key(storm_id, port), server);
+ Server server = new Server(storm_conf, port);
+ serverConnections.add(server);
return server;
}
/**
* establish a connection to a remote server
*/
- public synchronized IConnection connect(String storm_id, String host, int port) {
- IConnection connection = connections.get(key(host,port));
- if(connection !=null)
- {
- return connection;
- }
- IConnection client = new Client(storm_conf, clientChannelFactory,
- clientScheduleService, host, port, this);
- connections.put(key(host, port), client);
- return client;
- }
-
- synchronized void removeClient(String host, int port) {
- if (connections != null) {
- connections.remove(key(host, port));
- }
+ public IConnection connect(String storm_id, String host, int port) {
+ return new Client(storm_conf, clientChannelFactory,
+ clientScheduleService, host, port);
}
/**
@@ -96,18 +85,13 @@ public class Context implements IContext {
public synchronized void term() {
clientScheduleService.stop();
- for (IConnection conn : connections.values()) {
+ for (Server conn : serverConnections) {
conn.close();
}
-
- connections = null;
+ serverConnections = null;
//we need to release resources associated with client channel factory
clientChannelFactory.releaseExternalResources();
}
-
- private String key(String host, int port) {
- return String.format("%s:%d", host, port);
- }
}