You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by agresch <gi...@git.apache.org> on 2018/06/27 20:24:24 UTC

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

GitHub user agresch opened a pull request:

    https://github.com/apache/storm/pull/2741

    STORM-3124 reconnect to pacemaker on failure

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/agresch/storm agresch_pacemaker_connect

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2741.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2741
    
----
commit 8cd3aa25fd8463a161b684e2674b653aa344efbe
Author: Aaron Gresch <ag...@...>
Date:   2018-06-27T19:57:28Z

    STORM-3124 reconnect to pacemaker on failure

----


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2741#discussion_r199145547
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
    @@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
    -                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
    +            LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
     
    -                if (!saslNettyClient.isComplete()) {
    +            if (!saslNettyClient.isComplete()) {
                     String errorMessage =
                         "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
                     LOG.error(errorMessage);
                     throw new Exception(errorMessage);
    -                }
    +            }
                 ctx.pipeline().remove(this);
                 this.client.channelReady(channel);
     
                 // We call fireChannelRead since the client is allowed to
    -                // perform this request. The client's request will now proceed
    -                // to the next pipeline component namely StormClientHandler.
    +            // perform this request. The client's request will now proceed
    +            // to the next pipeline component namely StormClientHandler.
                 ctx.fireChannelRead(controlMessage);
    -            } else {
    +        } else {
                 LOG.warn("Unexpected control message: {}", controlMessage);
    -            }
    +        }
         }
     
         private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
    -            LOG.debug("Responding to server's token of length: {}",
    -            saslMessageToken.getSaslToken().length);
    -
    -            // Generate SASL response (but we only actually send the response if
    -            // it's non-null.
    -            byte[] responseToServer = saslNettyClient
    -            .saslResponse(saslMessageToken);
    -            if (responseToServer == null) {
    -                // If we generate a null response, then authentication has completed
    -                // (if not, warn), and return without sending a response back to the
    -                // server.
    -                LOG.debug("Response to server is null: authentication should now be complete.");
    -                if (!saslNettyClient.isComplete()) {
    -                    LOG.warn("Generated a null response, but authentication is not complete.");
    -                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    -                }
    -            this.client.channelReady(channel);
    -            } else {
    -                LOG.debug("Response to server token has length: {}",
    -                          responseToServer.length);
    -            // Construct a message containing the SASL response and send it to the
    +        LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
    +
    +        // Generate SASL response (but we only actually send the response if
    +        // it's non-null.
    +        byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
    +        if (responseToServer == null) {
    +            // If we generate a null response, then authentication has completed
    +            // (if not, warn), and return without sending a response back to the
                 // server.
    +            LOG.debug("Response to server is null: authentication should now be complete.");
    +            if (!saslNettyClient.isComplete()) {
    +                LOG.warn("Generated a null response, but authentication is not complete.");
    +                throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    +            }
    +            this.client.channelReady(channel);
    +        } else {
    +            LOG.debug("Response to server token has length: {}",
    +                      responseToServer.length);
    +            // Construct a message containing the SASL response and send it to the server.
                 SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
                 channel.writeAndFlush(saslResponse, channel.voidPromise());
             }
         }
    +
    +    @Override
    +    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    +        super.channelRegistered(ctx);
    +        LOG.debug("channelRegistered {}", ctx);
    --- End diff --
    
    In my case, the channel never went active.  I did not understand what was happening at the time.  By overriding the registered and unregistered, my first clue was that unregister was called immediately after register.  
    
    Since it took me a day and a half to debug this issue and this was my first real clue, I thought it was important to log to save time for debugging future issues.
    
    I will say I do not know anything about Netty though, I am sure you could have solve this much quicker.


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2741#discussion_r199057391
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
    @@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
    -                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
    +            LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
     
    -                if (!saslNettyClient.isComplete()) {
    +            if (!saslNettyClient.isComplete()) {
                     String errorMessage =
                         "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
                     LOG.error(errorMessage);
                     throw new Exception(errorMessage);
    -                }
    +            }
                 ctx.pipeline().remove(this);
                 this.client.channelReady(channel);
     
                 // We call fireChannelRead since the client is allowed to
    -                // perform this request. The client's request will now proceed
    -                // to the next pipeline component namely StormClientHandler.
    +            // perform this request. The client's request will now proceed
    +            // to the next pipeline component namely StormClientHandler.
                 ctx.fireChannelRead(controlMessage);
    -            } else {
    +        } else {
                 LOG.warn("Unexpected control message: {}", controlMessage);
    -            }
    +        }
         }
     
         private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
    -            LOG.debug("Responding to server's token of length: {}",
    -            saslMessageToken.getSaslToken().length);
    -
    -            // Generate SASL response (but we only actually send the response if
    -            // it's non-null.
    -            byte[] responseToServer = saslNettyClient
    -            .saslResponse(saslMessageToken);
    -            if (responseToServer == null) {
    -                // If we generate a null response, then authentication has completed
    -                // (if not, warn), and return without sending a response back to the
    -                // server.
    -                LOG.debug("Response to server is null: authentication should now be complete.");
    -                if (!saslNettyClient.isComplete()) {
    -                    LOG.warn("Generated a null response, but authentication is not complete.");
    -                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    -                }
    -            this.client.channelReady(channel);
    -            } else {
    -                LOG.debug("Response to server token has length: {}",
    -                          responseToServer.length);
    -            // Construct a message containing the SASL response and send it to the
    +        LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
    +
    +        // Generate SASL response (but we only actually send the response if
    +        // it's non-null.
    +        byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
    +        if (responseToServer == null) {
    +            // If we generate a null response, then authentication has completed
    +            // (if not, warn), and return without sending a response back to the
                 // server.
    +            LOG.debug("Response to server is null: authentication should now be complete.");
    +            if (!saslNettyClient.isComplete()) {
    +                LOG.warn("Generated a null response, but authentication is not complete.");
    +                throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    +            }
    +            this.client.channelReady(channel);
    +        } else {
    +            LOG.debug("Response to server token has length: {}",
    +                      responseToServer.length);
    +            // Construct a message containing the SASL response and send it to the server.
                 SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
                 channel.writeAndFlush(saslResponse, channel.voidPromise());
             }
         }
    +
    +    @Override
    +    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    +        super.channelRegistered(ctx);
    +        LOG.debug("channelRegistered {}", ctx);
    --- End diff --
    
    Sorry, didn't get to this in time. It looks good overall, but I'm wondering what these methods are for? If you want to monitor the reconnect, I think channelActive/channelInactive would be a better fit (see https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-channel-state-model)


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2741#discussion_r198638777
  
    --- Diff: storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---
    @@ -153,13 +154,13 @@ public String secretKey() {
         }
     
         public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException {
    -        LOG.debug("Sending message: {}", m.toString());
    +        LOG.debug("Sending pacemaker message to {}: {}", host, m.toString());
    --- End diff --
    
    nit: can we drop the `.toString()` for m? The log command should do it for you, if it is needed.


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2741#discussion_r198736753
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
    @@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
    -                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
    +            LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
     
    -                if (!saslNettyClient.isComplete()) {
    +            if (!saslNettyClient.isComplete()) {
                     String errorMessage =
                         "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
                     LOG.error(errorMessage);
                     throw new Exception(errorMessage);
    -                }
    +            }
                 ctx.pipeline().remove(this);
                 this.client.channelReady(channel);
     
                 // We call fireChannelRead since the client is allowed to
    -                // perform this request. The client's request will now proceed
    -                // to the next pipeline component namely StormClientHandler.
    +            // perform this request. The client's request will now proceed
    +            // to the next pipeline component namely StormClientHandler.
                 ctx.fireChannelRead(controlMessage);
    -            } else {
    +        } else {
                 LOG.warn("Unexpected control message: {}", controlMessage);
    -            }
    +        }
         }
     
         private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
    -            LOG.debug("Responding to server's token of length: {}",
    -            saslMessageToken.getSaslToken().length);
    -
    -            // Generate SASL response (but we only actually send the response if
    -            // it's non-null.
    -            byte[] responseToServer = saslNettyClient
    -            .saslResponse(saslMessageToken);
    -            if (responseToServer == null) {
    -                // If we generate a null response, then authentication has completed
    -                // (if not, warn), and return without sending a response back to the
    -                // server.
    -                LOG.debug("Response to server is null: authentication should now be complete.");
    -                if (!saslNettyClient.isComplete()) {
    -                    LOG.warn("Generated a null response, but authentication is not complete.");
    -                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    -                }
    -            this.client.channelReady(channel);
    -            } else {
    -                LOG.debug("Response to server token has length: {}",
    -                          responseToServer.length);
    -            // Construct a message containing the SASL response and send it to the
    +        LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
    +
    +        // Generate SASL response (but we only actually send the response if
    +        // it's non-null.
    +        byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
    +        if (responseToServer == null) {
    +            // If we generate a null response, then authentication has completed
    +            // (if not, warn), and return without sending a response back to the
                 // server.
    +            LOG.debug("Response to server is null: authentication should now be complete.");
    +            if (!saslNettyClient.isComplete()) {
    +                LOG.warn("Generated a null response, but authentication is not complete.");
    +                throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    +            }
    +            this.client.channelReady(channel);
    +        } else {
    +            LOG.debug("Response to server token has length: {}",
    +                      responseToServer.length);
    +            // Construct a message containing the SASL response and send it to the server.
                 SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
                 channel.writeAndFlush(saslResponse, channel.voidPromise());
             }
         }
    +
    +    @Override
    +    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    +        super.channelRegistered(ctx);
    +        LOG.debug("channelRegistered {}", ctx);
    +    }
    +
    +    @Override
    +    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    +        super.channelUnregistered(ctx);
    +        LOG.debug("channelUnregistered {}", ctx);
    +
    +    }
    +
    +    @Override
    +    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    +        LOG.warn("{} exceptionCaught", ctx, cause);
    +        super.exceptionCaught(ctx, cause);
    --- End diff --
    
    LOG.warn("{} exceptionCaught", ctx, cause) you have one {} but 2 variables here.


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2741#discussion_r198638417
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
    @@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
    -                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
    +            LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
     
    -                if (!saslNettyClient.isComplete()) {
    +            if (!saslNettyClient.isComplete()) {
                     String errorMessage =
                         "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
                     LOG.error(errorMessage);
                     throw new Exception(errorMessage);
    -                }
    +            }
                 ctx.pipeline().remove(this);
                 this.client.channelReady(channel);
     
                 // We call fireChannelRead since the client is allowed to
    -                // perform this request. The client's request will now proceed
    -                // to the next pipeline component namely StormClientHandler.
    +            // perform this request. The client's request will now proceed
    +            // to the next pipeline component namely StormClientHandler.
                 ctx.fireChannelRead(controlMessage);
    -            } else {
    +        } else {
                 LOG.warn("Unexpected control message: {}", controlMessage);
    -            }
    +        }
         }
     
         private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
    -            LOG.debug("Responding to server's token of length: {}",
    -            saslMessageToken.getSaslToken().length);
    -
    -            // Generate SASL response (but we only actually send the response if
    -            // it's non-null.
    -            byte[] responseToServer = saslNettyClient
    -            .saslResponse(saslMessageToken);
    -            if (responseToServer == null) {
    -                // If we generate a null response, then authentication has completed
    -                // (if not, warn), and return without sending a response back to the
    -                // server.
    -                LOG.debug("Response to server is null: authentication should now be complete.");
    -                if (!saslNettyClient.isComplete()) {
    -                    LOG.warn("Generated a null response, but authentication is not complete.");
    -                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    -                }
    -            this.client.channelReady(channel);
    -            } else {
    -                LOG.debug("Response to server token has length: {}",
    -                          responseToServer.length);
    -            // Construct a message containing the SASL response and send it to the
    +        LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
    +
    +        // Generate SASL response (but we only actually send the response if
    +        // it's non-null.
    +        byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
    +        if (responseToServer == null) {
    +            // If we generate a null response, then authentication has completed
    +            // (if not, warn), and return without sending a response back to the
                 // server.
    +            LOG.debug("Response to server is null: authentication should now be complete.");
    +            if (!saslNettyClient.isComplete()) {
    +                LOG.warn("Generated a null response, but authentication is not complete.");
    +                throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    +            }
    +            this.client.channelReady(channel);
    +        } else {
    +            LOG.debug("Response to server token has length: {}",
    +                      responseToServer.length);
    +            // Construct a message containing the SASL response and send it to the server.
                 SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
                 channel.writeAndFlush(saslResponse, channel.voidPromise());
             }
         }
    +
    +    @Override
    +    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    +        super.channelRegistered(ctx);
    +        LOG.info("channelRegistered {}", ctx);
    +    }
    --- End diff --
    
    These look like they are for debugging.  could we make them debug logs instead?


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2741#discussion_r199193610
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
    @@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
             if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
    -                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
    +            LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
     
    -                if (!saslNettyClient.isComplete()) {
    +            if (!saslNettyClient.isComplete()) {
                     String errorMessage =
                         "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
                     LOG.error(errorMessage);
                     throw new Exception(errorMessage);
    -                }
    +            }
                 ctx.pipeline().remove(this);
                 this.client.channelReady(channel);
     
                 // We call fireChannelRead since the client is allowed to
    -                // perform this request. The client's request will now proceed
    -                // to the next pipeline component namely StormClientHandler.
    +            // perform this request. The client's request will now proceed
    +            // to the next pipeline component namely StormClientHandler.
                 ctx.fireChannelRead(controlMessage);
    -            } else {
    +        } else {
                 LOG.warn("Unexpected control message: {}", controlMessage);
    -            }
    +        }
         }
     
         private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
             Channel channel = ctx.channel();
             KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
    -            LOG.debug("Responding to server's token of length: {}",
    -            saslMessageToken.getSaslToken().length);
    -
    -            // Generate SASL response (but we only actually send the response if
    -            // it's non-null.
    -            byte[] responseToServer = saslNettyClient
    -            .saslResponse(saslMessageToken);
    -            if (responseToServer == null) {
    -                // If we generate a null response, then authentication has completed
    -                // (if not, warn), and return without sending a response back to the
    -                // server.
    -                LOG.debug("Response to server is null: authentication should now be complete.");
    -                if (!saslNettyClient.isComplete()) {
    -                    LOG.warn("Generated a null response, but authentication is not complete.");
    -                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    -                }
    -            this.client.channelReady(channel);
    -            } else {
    -                LOG.debug("Response to server token has length: {}",
    -                          responseToServer.length);
    -            // Construct a message containing the SASL response and send it to the
    +        LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
    +
    +        // Generate SASL response (but we only actually send the response if
    +        // it's non-null.
    +        byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
    +        if (responseToServer == null) {
    +            // If we generate a null response, then authentication has completed
    +            // (if not, warn), and return without sending a response back to the
                 // server.
    +            LOG.debug("Response to server is null: authentication should now be complete.");
    +            if (!saslNettyClient.isComplete()) {
    +                LOG.warn("Generated a null response, but authentication is not complete.");
    +                throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
    +            }
    +            this.client.channelReady(channel);
    +        } else {
    +            LOG.debug("Response to server token has length: {}",
    +                      responseToServer.length);
    +            // Construct a message containing the SASL response and send it to the server.
                 SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
                 channel.writeAndFlush(saslResponse, channel.voidPromise());
             }
         }
    +
    +    @Override
    +    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    +        super.channelRegistered(ctx);
    +        LOG.debug("channelRegistered {}", ctx);
    --- End diff --
    
    Thanks for explaining. Happy you solved it.


---

[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2741


---

[GitHub] storm issue #2741: STORM-3124 reconnect to pacemaker on failure

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2741
  
    @revans2 - made the changes you wanted.


---