You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/01/25 00:29:20 UTC
[12/50] storm git commit: DRPCClient should keep trying to connect to
DRPC server forever
DRPCClient should keep trying to connect to DRPC server forever
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9916d37
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9916d37
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9916d37
Branch: refs/heads/1.x-branch
Commit: c9916d37357c5bdc2795249a7ad8aee3d05e46d4
Parents: d3657e2
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Jan 19 17:58:43 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Jan 19 17:58:43 2016 +0000
----------------------------------------------------------------------
.../org/apache/storm/security/auth/TBackoffConnect.java | 10 ++++++++--
.../jvm/org/apache/storm/security/auth/ThriftClient.java | 4 +++-
storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java | 2 ++
3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c9916d37/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java b/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
index 688b130..0ca9ead 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
@@ -30,15 +30,21 @@ public class TBackoffConnect {
private int _completedRetries = 0;
private int _retryTimes;
private StormBoundedExponentialBackoffRetry waitGrabber;
+ private boolean _retryForever = false;
- public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) {
+ public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling, boolean retryForever) {
+ _retryForever = retryForever;
_retryTimes = retryTimes;
waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval,
retryIntervalCeiling,
retryTimes);
}
+ public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) {
+ this(retryTimes, retryInterval, retryIntervalCeiling, false);
+ }
+
public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host, String asUser) throws IOException {
boolean connected = false;
TTransport transportResult = null;
@@ -71,6 +77,6 @@ public class TBackoffConnect {
}
private boolean canRetry() {
- return (_completedRetries < _retryTimes);
+ return _retryForever || (_completedRetries < _retryTimes);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/c9916d37/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index 1ea4a08..0a8e515 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -36,6 +36,7 @@ public class ThriftClient {
private Map _conf;
private ThriftConnectionType _type;
private String _asUser;
+ protected boolean _retryForever = false;
public ThriftClient(Map storm_conf, ThriftConnectionType type, String host) {
this(storm_conf, type, host, null, null, null);
@@ -93,7 +94,8 @@ public class ThriftClient {
= new TBackoffConnect(
Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
- Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
+ Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)),
+ _retryForever);
_transport = connectionRetry.doConnectWithRetry(transportPlugin, socket, _host, _asUser);
} catch (IOException ex) {
throw new RuntimeException(ex);
http://git-wip-us.apache.org/repos/asf/storm/blob/c9916d37/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java b/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
index ec542ac..7f83789 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
@@ -37,6 +37,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
public DRPCClient(Map conf, String host, int port) throws TTransportException {
this(conf, host, port, null);
+ _retryForever = true;
}
public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
@@ -44,6 +45,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
this.host = host;
this.port = port;
this.client = new DistributedRPC.Client(_protocol);
+ _retryForever = true;
}
public String getHost() {