You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/01/25 00:29:20 UTC

[12/50] storm git commit: DRPCClient should keep trying to connect to DRPC server forever

DRPCClient should keep trying to connect to DRPC server forever


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9916d37
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9916d37
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9916d37

Branch: refs/heads/1.x-branch
Commit: c9916d37357c5bdc2795249a7ad8aee3d05e46d4
Parents: d3657e2
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Jan 19 17:58:43 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Jan 19 17:58:43 2016 +0000

----------------------------------------------------------------------
 .../org/apache/storm/security/auth/TBackoffConnect.java   | 10 ++++++++--
 .../jvm/org/apache/storm/security/auth/ThriftClient.java  |  4 +++-
 storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java |  2 ++
 3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c9916d37/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java b/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
index 688b130..0ca9ead 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/TBackoffConnect.java
@@ -30,15 +30,21 @@ public class TBackoffConnect {
     private int _completedRetries = 0;
     private int _retryTimes;
     private StormBoundedExponentialBackoffRetry waitGrabber;
+    private boolean _retryForever = false;
 
-    public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) {
+    public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling, boolean retryForever) {
 
+        _retryForever = retryForever;
         _retryTimes = retryTimes;
         waitGrabber = new StormBoundedExponentialBackoffRetry(retryInterval,
                                                               retryIntervalCeiling,
                                                               retryTimes);
     }
 
+    public TBackoffConnect(int retryTimes, int retryInterval, int retryIntervalCeiling) {
+        this(retryTimes, retryInterval, retryIntervalCeiling, false);
+    }
+
     public TTransport doConnectWithRetry(ITransportPlugin transportPlugin, TTransport underlyingTransport, String host, String asUser) throws IOException {
         boolean connected = false;
         TTransport transportResult = null;
@@ -71,6 +77,6 @@ public class TBackoffConnect {
     }
 
     private boolean canRetry() {
-        return (_completedRetries < _retryTimes);
+        return _retryForever || (_completedRetries < _retryTimes);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c9916d37/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
index 1ea4a08..0a8e515 100644
--- a/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
+++ b/storm-core/src/jvm/org/apache/storm/security/auth/ThriftClient.java
@@ -36,6 +36,7 @@ public class ThriftClient {
     private Map _conf;
     private ThriftConnectionType _type;
     private String _asUser;
+    protected boolean _retryForever = false;
 
     public ThriftClient(Map storm_conf, ThriftConnectionType type, String host) {
         this(storm_conf, type, host, null, null, null);
@@ -93,7 +94,8 @@ public class ThriftClient {
                 = new TBackoffConnect(
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_TIMES)),
                                       Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL)),
-                                      Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)));
+                                      Utils.getInt(_conf.get(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING)),
+                                      _retryForever);
             _transport = connectionRetry.doConnectWithRetry(transportPlugin, socket, _host, _asUser);
         } catch (IOException ex) {
             throw new RuntimeException(ex);

http://git-wip-us.apache.org/repos/asf/storm/blob/c9916d37/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java b/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
index ec542ac..7f83789 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DRPCClient.java
@@ -37,6 +37,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
 
     public DRPCClient(Map conf, String host, int port) throws TTransportException {
         this(conf, host, port, null);
+        _retryForever = true;
     }
 
     public DRPCClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
@@ -44,6 +45,7 @@ public class DRPCClient extends ThriftClient implements DistributedRPC.Iface {
         this.host = host;
         this.port = port;
         this.client = new DistributedRPC.Client(_protocol);
+        _retryForever = true;
     }
         
     public String getHost() {