You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by kishorvpatil <gi...@git.apache.org> on 2018/04/30 22:55:06 UTC

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

GitHub user kishorvpatil opened a pull request:

    https://github.com/apache/storm/pull/2651

    [STORM-3054] Add Topology level configuration socket timeout for DRPC Invocation Client

    This patch fixes following this:
    
    - Add Topology level configuration socket timeout for DRPC Invocation Client
    - Fix the `_clients` map key in `ReturnResults`
    - Add `ReturnResults` debug log entries.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kishorvpatil/incubator-storm storm3054

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2651.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2651
    
----
commit a96581f36e6cae12943c29c42a843e46f5b6ad7e
Author: Kishor Patil <kp...@...>
Date:   2018-04-30T22:39:04Z

    Add Topology level configuration socket timeout for DRPC Invocation Client

commit da1cb49f3bcef0df84330422e9e091a65bdc541b
Author: Kishor Patil <kp...@...>
Date:   2018-04-30T22:46:07Z

    Fix ReturnResults reconnection logic

commit 0ce231e10e0c49a89f3c3a286b9aadf493a4bc7f
Author: Kishor Patil <kp...@...>
Date:   2018-04-30T22:48:06Z

    Add debug statements to ReturnResults

----


---

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r189821240
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +92,34 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
                     }
                 }
             }
         }
     
    -    private void reconnectClient(DRPCInvocationsClient client) {
    -        if (client instanceof DRPCInvocationsClient) {
    -            try {
    -                LOG.info("reconnecting... ");
    -                client.reconnectClient(); //Blocking call
    -            } catch (TException e2) {
    -                LOG.error("Failed to connect to DRPC server", e2);
    +    private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
    +        DistributedRPCInvocations.Iface client;
    +        if (local) {
    +            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
    +        } else {
    +            List server = new ArrayList() {
    +                {
    +                    add(host);
    +                    add(port);
    +                }
    +            };
    +            if (!_clients.containsKey(server)) {
    +                try {
    +                    DRPCInvocationsClient oldClient = _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
    --- End diff --
    
    Now it loses the cache functionality. Instead of this way, can we invalidate cache when we find that the client is broken?


---

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r188107411
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +85,32 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
                     }
                 }
             }
         }
     
    -    private void reconnectClient(DRPCInvocationsClient client) {
    -        if (client instanceof DRPCInvocationsClient) {
    -            try {
    -                LOG.info("reconnecting... ");
    -                client.reconnectClient(); //Blocking call
    -            } catch (TException e2) {
    -                LOG.error("Failed to connect to DRPC server", e2);
    +    private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
    +        DistributedRPCInvocations.Iface client;
    +        if (local) {
    +            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
    +        } else {
    +            String server = getServer(host, port);
    +            if (!_clients.containsKey(server)) {
    +                try {
    +                    _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
    +                } catch (org.apache.thrift.transport.TTransportException ex) {
    +                    throw new RuntimeException(ex);
    +                }
                 }
    +            client = _clients.get(server);
             }
    +        return client;
    +    }
    +
    +    private String getServer(String host, int port) {
    +        return host + port;
    --- End diff --
    
    Reverting to List.


---

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r189420057
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +85,32 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
    --- End diff --
    
    @kishorvpatil 
    I meant we don't invalidate broken `client` from `_clients` so it will always pick same `client`, instead of rebuilding new `client`.


---

[GitHub] storm issue #2651: [STORM-3054] Add Topology level configuration socket time...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2651
  
    @kishorvpatil any update on the comments from @HeartSaVioR ?


---

[GitHub] storm issue #2651: [STORM-3054] Add Topology level configuration socket time...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on the issue:

    https://github.com/apache/storm/pull/2651
  
    @HeartSaVioR @revans2 sorry for delay in addressing the review comments.


---

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r186308658
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +85,32 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
                     }
                 }
             }
         }
     
    -    private void reconnectClient(DRPCInvocationsClient client) {
    -        if (client instanceof DRPCInvocationsClient) {
    -            try {
    -                LOG.info("reconnecting... ");
    -                client.reconnectClient(); //Blocking call
    -            } catch (TException e2) {
    -                LOG.error("Failed to connect to DRPC server", e2);
    +    private DistributedRPCInvocations.Iface getDRPCClient(String host, int port) {
    +        DistributedRPCInvocations.Iface client;
    +        if (local) {
    +            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
    +        } else {
    +            String server = getServer(host, port);
    +            if (!_clients.containsKey(server)) {
    +                try {
    +                    _clients.put(server, new DRPCInvocationsClient(_conf, host, port));
    +                } catch (org.apache.thrift.transport.TTransportException ex) {
    +                    throw new RuntimeException(ex);
    +                }
                 }
    +            client = _clients.get(server);
             }
    +        return client;
    +    }
    +
    +    private String getServer(String host, int port) {
    +        return host + port;
    --- End diff --
    
    Better to keep it as List since bad case could be happen (when host ends with number).


---

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r186308794
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +85,32 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
    --- End diff --
    
    Now it always reuse the existing client even TException is being raised from the client. I guess we need to handle the case.


---

[GitHub] storm pull request #2651: [STORM-3054] Add Topology level configuration sock...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2651#discussion_r188107350
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java ---
    @@ -103,21 +85,32 @@ public void execute(Tuple input) {
                             LOG.error("Failed to return results to DRPC server", tex);
                             _collector.fail(input);
                         }
    -                    reconnectClient((DRPCInvocationsClient) client);
    +                    client = getDRPCClient(host, port);
    --- End diff --
    
    This is same as before, if we could not make connection. or reconnect failed in previous case.


---