You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/22 23:37:49 UTC

[4/8] storm git commit: ReturnResults should make 3 attempts to send results to DRPC server, in case of thrift exceptions.

ReturnResults should make 3 attempts to send results to DRPC server, in case of thrift exceptions.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/49c4ef6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/49c4ef6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/49c4ef6d

Branch: refs/heads/master
Commit: 49c4ef6d274fc7ca17b623edf1ea53001c8eeaa2
Parents: c9916d3
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Jan 19 18:34:14 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Jan 19 18:34:14 2016 +0000

----------------------------------------------------------------------
 .../org/apache/storm/drpc/ReturnResults.java    | 51 +++++++++++---------
 1 file changed, 28 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/49c4ef6d/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
index 74c4561..a9a5aa1 100644
--- a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
+++ b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -83,35 +83,40 @@ public class ReturnResults extends BaseRichBolt {
                 client = _clients.get(server);
             }
  
-            try {
-                client.result(id, result);
-                _collector.ack(input);
-            } catch (AuthorizationException aze) {
-                LOG.error("Not authorized to return results to DRPC server", aze);
-                _collector.fail(input);
-                if (client instanceof DRPCInvocationsClient) {
-                    try {
-                        LOG.info("reconnecting... ");
-                        ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
-                    } catch (TException e2) {
-                        throw new RuntimeException(e2);
-                    }
-                }
-            } catch(TException e) {
-                LOG.error("Failed to return results to DRPC server", e);
-                _collector.fail(input);
-                if (client instanceof DRPCInvocationsClient) {
-                    try {
-                        LOG.info("reconnecting... ");
-                        ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
-                    } catch (TException e2) {
-                        throw new RuntimeException(e2);
+
+            int retryCnt = 0;
+            int maxRetries = 3;
+            while (retryCnt < maxRetries) {
+                retryCnt++;
+                try {
+                    client.result(id, result);
+                    _collector.ack(input);
+                    break;
+                } catch (AuthorizationException aze) {
+                    LOG.error("Not authorized to return results to DRPC server", aze);
+                    _collector.fail(input);
+                    throw new RuntimeException(aze);
+                } catch (TException tex) {
+                    if (retryCnt >= maxRetries) {
+                        LOG.error("Failed to return results to DRPC server", tex);
+                        _collector.fail(input);
                     }
+                    reconnectClient((DRPCInvocationsClient) client);
                 }
             }
         }
     }    
 
+    private void reconnectClient(DRPCInvocationsClient client) {
+        if (client instanceof DRPCInvocationsClient) {
+            try {
+                LOG.info("reconnecting... ");
+                client.reconnectClient(); //Blocking call
+            } catch (TException e2) {
+                LOG.error("Failed to connect to DRPC server", e2);
+            }
+        }
+    }
     @Override
     public void cleanup() {
         for(DRPCInvocationsClient c: _clients.values()) {