You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by kishorvpatil <gi...@git.apache.org> on 2018/04/30 22:55:06 UTC
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
GitHub user kishorvpatil opened a pull request:
https://github.com/apache/storm/pull/2651
[STORM-3054] Add Topology level configuration socket timeout for DRPC Invocation Client
This patch fixes following this:
- Add Topology level configuration socket timeout for DRPC Invocation Client
- Fix the `_clients` map key in `ReturnResults`
- Add `ReturnResults` debug log entries.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kishorvpatil/incubator-storm storm3054
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2651.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2651
----
commit a96581f36e6cae12943c29c42a843e46f5b6ad7e
Author: Kishor Patil <kp...@...>
Date: 2018-04-30T22:39:04Z
Add Topology level configuration socket timeout for DRPC Invocation Client
commit da1cb49f3bcef0df84330422e9e091a65bdc541b
Author: Kishor Patil <kp...@...>
Date: 2018-04-30T22:46:07Z
Fix ReturnResults reconnection logic
commit 0ce231e10e0c49a89f3c3a286b9aadf493a4bc7f
Author: Kishor Patil <kp...@...>
Date: 2018-04-30T22:48:06Z
Add debug statements to ReturnResults
----
---
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r189821240
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +92,34 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
}
}
}
}
- private void reconnectClient(DRPCInvocationsClient client) {
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- client.reconnectClient(); //Blocking call
- } catch (TException e2) {
- LOG.error("Failed to connect to DRPC server", e2);
+ private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
+ DistributedRPCInvocations.Iface client;
+ if (local) {
+ client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
+ } else {
+ List server = new ArrayList() {
+ {
+ add(host);
+ add(port);
+ }
+ };
+ if (!_clients.containsKey(server)) {
+ try {
+ DRPCInvocationsClient oldClient = _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
--- End diff --
Now it loses the cache functionality. Instead of this way, can we invalidate cache when we find that the client is broken?
---
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r188107411
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +85,32 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
}
}
}
}
- private void reconnectClient(DRPCInvocationsClient client) {
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- client.reconnectClient(); //Blocking call
- } catch (TException e2) {
- LOG.error("Failed to connect to DRPC server", e2);
+ private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
+ DistributedRPCInvocations.Iface client;
+ if (local) {
+ client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
+ } else {
+ String server = getServer(host, port);
+ if (!_clients.containsKey(server)) {
+ try {
+ _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
+ } catch (org.apache.thrift.transport.TTransportException ex) {
+ throw new RuntimeException(ex);
+ }
}
+ client = _clients.get(server);
}
+ return client;
+ }
+
+ private String getServer(String host, int port) {
+ return host + port;
--- End diff --
Reverting to List.
---
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r189420057
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +85,32 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
--- End diff --
@kishorvpatil
I meant we don't invalidate broken `client` from `_clients` so it will always pick same `client`, instead of rebuilding new `client`.
---
[GitHub] storm issue #2651: [STORM-3054] Add Topology level configuration socket time...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2651
@kishorvpatil any update on the comments from @HeartSaVioR ?
---
[GitHub] storm issue #2651: [STORM-3054] Add Topology level configuration socket time...
Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the issue:
https://github.com/apache/storm/pull/2651
@HeartSaVioR @revans2 sorry for delay in addressing the review comments.
---
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r186308658
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +85,32 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
}
}
}
}
- private void reconnectClient(DRPCInvocationsClient client) {
- if (client instanceof DRPCInvocationsClient) {
- try {
- LOG.info("reconnecting... ");
- client.reconnectClient(); //Blocking call
- } catch (TException e2) {
- LOG.error("Failed to connect to DRPC server", e2);
+ private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
+ DistributedRPCInvocations.Iface client;
+ if (local) {
+ client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
+ } else {
+ String server = getServer(host, port);
+ if (!_clients.containsKey(server)) {
+ try {
+ _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
+ } catch (org.apache.thrift.transport.TTransportException ex) {
+ throw new RuntimeException(ex);
+ }
}
+ client = _clients.get(server);
}
+ return client;
+ }
+
+ private String getServer(String host, int port) {
+ return host + port;
--- End diff --
Better to keep it as List since bad case could be happen (when host ends with number).
---
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r186308794
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +85,32 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
--- End diff --
Now it always reuse the existing client even TException is being raised from the client. I guess we need to handle the case.
---
[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...
Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:
https://github.com/apache/storm/pull/2651#discussion_r188107350
--- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
@@ -103,21 +85,32 @@ public void execute(Tuple input) {
LOG.error("Failed to return results to DRPC server", tex);
_collector.fail(input);
}
- reconnectClient((DRPCInvocationsClient) client);
+ client = getDRPCClient(host, port);
--- End diff --
This is same as before, if we could not make connection. or reconnect failed in previous case.
---