You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/31 18:17:11 UTC
[1/2] storm git commit: remove context connection & add context
connect specification
Repository: storm
Updated Branches:
refs/heads/master 4964112d9 -> 3356f1f76
remove context connection & add context connect specification
N/A
Signed-off-by: pczb <pc...@163.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7a4f1d0f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7a4f1d0f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7a4f1d0f
Branch: refs/heads/master
Commit: 7a4f1d0ff8f49243f8ca7440ed29d6c10c136269
Parents: 290458c
Author: pczb <pc...@163.com>
Authored: Thu May 10 01:41:18 2018 +0800
Committer: pczb <pc...@163.com>
Committed: Thu May 17 23:02:02 2018 +0800
----------------------------------------------------------------------
.../org/apache/storm/messaging/IContext.java | 1 +
.../apache/storm/messaging/netty/Client.java | 11 ++----
.../apache/storm/messaging/netty/Context.java | 38 ++++++--------------
3 files changed, 14 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7a4f1d0f/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
index c2c9c8a..8d2c0dc 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -47,6 +47,7 @@ public interface IContext {
/**
* This method establish a client side connection to a remote server
+ * implementation should return a new connection every call
*
* @param storm_id topology ID
* @param host remote host
http://git-wip-us.apache.org/repos/asf/storm/blob/7a4f1d0f/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 2736e2f..3cdde87 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -78,10 +78,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final StormBoundedExponentialBackoffRetry retryPolicy;
private final ClientBootstrap bootstrap;
private final InetSocketAddress dstAddress;
- //The actual name of the host we are trying to connect to so that
- // when we remove ourselves from the connection cache there is no concern that
- // the resolved host name is different.
- private final String dstHost;
+
/**
* The channel used for all write operations from this client to the remote destination.
*/
@@ -114,7 +111,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
* Whether the SASL channel is ready.
*/
private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
- private final Context context;
private final HashedWheelTimer scheduler;
private final MessageBuffer batcher;
// wait strategy when the netty channel is not writable
@@ -128,11 +124,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private volatile boolean closing = false;
Client(Map<String, Object> topoConf, AtomicBoolean[] remoteBpStatus, ChannelFactory factory, HashedWheelTimer scheduler, String host,
- int port, Context context) {
+ int port) {
this.topoConf = topoConf;
closing = false;
this.scheduler = scheduler;
- this.context = context;
int bufferSize = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
int lowWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_LOW_WATERMARK));
int highWatermark = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_HIGH_WATERMARK));
@@ -149,7 +144,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
// Initiate connection to remote destination
bootstrap = createClientBootstrap(factory, bufferSize, lowWatermark, highWatermark, topoConf, remoteBpStatus);
- dstHost = host;
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
launchChannelAliveThread();
@@ -451,7 +445,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
public void close() {
if (!closing) {
LOG.info("closing Netty Client {}", dstAddressPrefixedName);
- context.removeClient(dstHost, dstAddress.getPort());
// Set closing to true to prevent any further reconnection attempts.
closing = true;
waitForPendingMessagesToBeSent();
http://git-wip-us.apache.org/repos/asf/storm/blob/7a4f1d0f/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
index 1816548..3614a94 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -12,7 +12,8 @@
package org.apache.storm.messaging.netty;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -26,7 +27,7 @@ import org.jboss.netty.util.HashedWheelTimer;
public class Context implements IContext {
private Map<String, Object> topoConf;
- private Map<String, IConnection> connections;
+ private List<Server> serverConnections;
private NioClientSocketChannelFactory clientChannelFactory;
private HashedWheelTimer clientScheduleService;
@@ -35,7 +36,7 @@ public class Context implements IContext {
*/
public void prepare(Map<String, Object> topoConf) {
this.topoConf = topoConf;
- connections = new HashMap<>();
+ serverConnections = new ArrayList<>();
//each context will have a single client channel factory
int maxWorkers = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
@@ -56,29 +57,17 @@ public class Context implements IContext {
* establish a server with a binding port
*/
public synchronized IConnection bind(String storm_id, int port) {
- IConnection server = new Server(topoConf, port);
- connections.put(key(storm_id, server.getPort()), server);
+ Server server = new Server(topoConf, port);
+ serverConnections.add(server);
return server;
}
/**
* establish a connection to a remote server
*/
- public synchronized IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
- IConnection connection = connections.get(key(host, port));
- if (connection != null) {
- return connection;
- }
- IConnection client = new Client(topoConf, remoteBpStatus, clientChannelFactory,
- clientScheduleService, host, port, this);
- connections.put(key(host, client.getPort()), client);
- return client;
- }
-
- synchronized void removeClient(String host, int port) {
- if (connections != null) {
- connections.remove(key(host, port));
- }
+ public IConnection connect(String storm_id, String host, int port, AtomicBoolean[] remoteBpStatus) {
+ return new Client(topoConf, remoteBpStatus, clientChannelFactory,
+ clientScheduleService, host, port);
}
/**
@@ -87,18 +76,13 @@ public class Context implements IContext {
public synchronized void term() {
clientScheduleService.stop();
- for (IConnection conn : connections.values()) {
+ for (Server conn : serverConnections) {
conn.close();
}
-
- connections = null;
+ serverConnections = null;
//we need to release resources associated with client channel factory
clientChannelFactory.releaseExternalResources();
}
-
- private String key(String host, int port) {
- return String.format("%s:%d", host, port);
- }
}
[2/2] storm git commit: Merge branch 'dev' of
https://github.com/pczb/storm into STORM-3055
Posted by bo...@apache.org.
Merge branch 'dev' of https://github.com/pczb/storm into STORM-3055
STORM-3055: remove context connection cache
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3356f1f7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3356f1f7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3356f1f7
Branch: refs/heads/master
Commit: 3356f1f764135570b948f6614c4816f49fd68dbd
Parents: 4964112 7a4f1d0
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Thu May 31 12:56:10 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Thu May 31 12:56:10 2018 -0500
----------------------------------------------------------------------
.../org/apache/storm/messaging/IContext.java | 1 +
.../apache/storm/messaging/netty/Client.java | 11 ++----
.../apache/storm/messaging/netty/Context.java | 38 ++++++--------------
3 files changed, 14 insertions(+), 36 deletions(-)
----------------------------------------------------------------------