You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/01/25 00:29:21 UTC
[13/50] storm git commit: ReturnResults should make 3 attempts to
send results to DRPC server, in case of thrift exceptions.
ReturnResults should make 3 attempts to send results to DRPC server, in case of thrift exceptions.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/49c4ef6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/49c4ef6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/49c4ef6d
Branch: refs/heads/1.x-branch
Commit: 49c4ef6d274fc7ca17b623edf1ea53001c8eeaa2
Parents: c9916d3
Author: Kishor Patil <kp...@yahoo-inc.com>
Authored: Tue Jan 19 18:34:14 2016 +0000
Committer: Kishor Patil <kp...@yahoo-inc.com>
Committed: Tue Jan 19 18:34:14 2016 +0000
----------------------------------------------------------------------
.../org/apache/storm/drpc/ReturnResults.java | 51 +++++++++++---------
1 file changed, 28 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/49c4ef6d/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
index 74c4561..a9a5aa1 100644
--- a/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
+++ b/storm-core/src/jvm/org/apache/storm/drpc/ReturnResults.java
@@ -83,35 +83,40 @@ public class ReturnResults extends BaseRichBolt {
client = _clients.get(server);
}
- try {
- client.result(id, result);
- _collector.ack(input);
- } catch (AuthorizationException aze) {
- LOG.error("Not authorized to return results to DRPC server", aze);
- _collector.fail(input);
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
- } catch (TException e2) {
- throw new RuntimeException(e2);
- }
- }
- } catch(TException e) {
- LOG.error("Failed to return results to DRPC server", e);
- _collector.fail(input);
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
- } catch (TException e2) {
- throw new RuntimeException(e2);
+
+ int retryCnt = 0;
+ int maxRetries = 3;
+ while (retryCnt < maxRetries) {
+ retryCnt++;
+ try {
+ client.result(id, result);
+ _collector.ack(input);
+ break;
+ } catch (AuthorizationException aze) {
+ LOG.error("Not authorized to return results to DRPC server", aze);
+ _collector.fail(input);
+ throw new RuntimeException(aze);
+ } catch (TException tex) {
+ if (retryCnt >= maxRetries) {
+ LOG.error("Failed to return results to DRPC server", tex);
+ _collector.fail(input);
}
+ reconnectClient((DRPCInvocationsClient) client);
}
}
}
}
+ private void reconnectClient(DRPCInvocationsClient client) {
+ if (client instanceof DRPCInvocationsClient) {
+ try {
+ LOG.info("reconnecting... ");
+ client.reconnectClient(); //Blocking call
+ } catch (TException e2) {
+ LOG.error("Failed to connect to DRPC server", e2);
+ }
+ }
+ }
@Override
public void cleanup() {
for(DRPCInvocationsClient c: _clients.values()) {