You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by agresch <gi...@git.apache.org> on 2018/06/27 20:24:24 UTC
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
GitHub user agresch opened a pull request:
https://github.com/apache/storm/pull/2741
STORM-3124 reconnect to pacemaker on failure
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/agresch/storm agresch_pacemaker_connect
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2741.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2741
----
commit 8cd3aa25fd8463a161b684e2674b653aa344efbe
Author: Aaron Gresch <ag...@...>
Date: 2018-06-27T19:57:28Z
STORM-3124 reconnect to pacemaker on failure
----
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2741#discussion_r199145547
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
@@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
- if (!saslNettyClient.isComplete()) {
+ if (!saslNettyClient.isComplete()) {
String errorMessage =
"Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
LOG.error(errorMessage);
throw new Exception(errorMessage);
- }
+ }
ctx.pipeline().remove(this);
this.client.channelReady(channel);
// We call fireChannelRead since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
ctx.fireChannelRead(controlMessage);
- } else {
+ } else {
LOG.warn("Unexpected control message: {}", controlMessage);
- }
+ }
}
private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
- LOG.debug("Responding to server's token of length: {}",
- saslMessageToken.getSaslToken().length);
-
- // Generate SASL response (but we only actually send the response if
- // it's non-null.
- byte[] responseToServer = saslNettyClient
- .saslResponse(saslMessageToken);
- if (responseToServer == null) {
- // If we generate a null response, then authentication has completed
- // (if not, warn), and return without sending a response back to the
- // server.
- LOG.debug("Response to server is null: authentication should now be complete.");
- if (!saslNettyClient.isComplete()) {
- LOG.warn("Generated a null response, but authentication is not complete.");
- throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
- }
- this.client.channelReady(channel);
- } else {
- LOG.debug("Response to server token has length: {}",
- responseToServer.length);
- // Construct a message containing the SASL response and send it to the
+ LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
// server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady(channel);
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ // Construct a message containing the SASL response and send it to the server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
channel.writeAndFlush(saslResponse, channel.voidPromise());
}
}
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ LOG.debug("channelRegistered {}", ctx);
--- End diff --
In my case, the channel never went active. I did not understand what was happening at the time. By overriding the registered and unregistered, my first clue was that unregister was called immediately after register.
Since it took me a day and a half to debug this issue and this was my first real clue, I thought it was important to log to save time for debugging future issues.
I will say I do not know anything about Netty though, I am sure you could have solve this much quicker.
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2741#discussion_r199057391
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
@@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
- if (!saslNettyClient.isComplete()) {
+ if (!saslNettyClient.isComplete()) {
String errorMessage =
"Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
LOG.error(errorMessage);
throw new Exception(errorMessage);
- }
+ }
ctx.pipeline().remove(this);
this.client.channelReady(channel);
// We call fireChannelRead since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
ctx.fireChannelRead(controlMessage);
- } else {
+ } else {
LOG.warn("Unexpected control message: {}", controlMessage);
- }
+ }
}
private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
- LOG.debug("Responding to server's token of length: {}",
- saslMessageToken.getSaslToken().length);
-
- // Generate SASL response (but we only actually send the response if
- // it's non-null.
- byte[] responseToServer = saslNettyClient
- .saslResponse(saslMessageToken);
- if (responseToServer == null) {
- // If we generate a null response, then authentication has completed
- // (if not, warn), and return without sending a response back to the
- // server.
- LOG.debug("Response to server is null: authentication should now be complete.");
- if (!saslNettyClient.isComplete()) {
- LOG.warn("Generated a null response, but authentication is not complete.");
- throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
- }
- this.client.channelReady(channel);
- } else {
- LOG.debug("Response to server token has length: {}",
- responseToServer.length);
- // Construct a message containing the SASL response and send it to the
+ LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
// server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady(channel);
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ // Construct a message containing the SASL response and send it to the server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
channel.writeAndFlush(saslResponse, channel.voidPromise());
}
}
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ LOG.debug("channelRegistered {}", ctx);
--- End diff --
Sorry, didn't get to this in time. It looks good overall, but I'm wondering what these methods are for? If you want to monitor the reconnect, I think channelActive/channelInactive would be a better fit (see https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-channel-state-model)
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2741#discussion_r198638777
--- Diff: storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---
@@ -153,13 +154,13 @@ public String secretKey() {
}
public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException {
- LOG.debug("Sending message: {}", m.toString());
+ LOG.debug("Sending pacemaker message to {}: {}", host, m.toString());
--- End diff --
nit: can we drop the `.toString()` for m? The log command should do it for you, if it is needed.
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2741#discussion_r198736753
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
@@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
- if (!saslNettyClient.isComplete()) {
+ if (!saslNettyClient.isComplete()) {
String errorMessage =
"Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
LOG.error(errorMessage);
throw new Exception(errorMessage);
- }
+ }
ctx.pipeline().remove(this);
this.client.channelReady(channel);
// We call fireChannelRead since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
ctx.fireChannelRead(controlMessage);
- } else {
+ } else {
LOG.warn("Unexpected control message: {}", controlMessage);
- }
+ }
}
private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
- LOG.debug("Responding to server's token of length: {}",
- saslMessageToken.getSaslToken().length);
-
- // Generate SASL response (but we only actually send the response if
- // it's non-null.
- byte[] responseToServer = saslNettyClient
- .saslResponse(saslMessageToken);
- if (responseToServer == null) {
- // If we generate a null response, then authentication has completed
- // (if not, warn), and return without sending a response back to the
- // server.
- LOG.debug("Response to server is null: authentication should now be complete.");
- if (!saslNettyClient.isComplete()) {
- LOG.warn("Generated a null response, but authentication is not complete.");
- throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
- }
- this.client.channelReady(channel);
- } else {
- LOG.debug("Response to server token has length: {}",
- responseToServer.length);
- // Construct a message containing the SASL response and send it to the
+ LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
// server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady(channel);
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ // Construct a message containing the SASL response and send it to the server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
channel.writeAndFlush(saslResponse, channel.voidPromise());
}
}
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ LOG.debug("channelRegistered {}", ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelUnregistered(ctx);
+ LOG.debug("channelUnregistered {}", ctx);
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOG.warn("{} exceptionCaught", ctx, cause);
+ super.exceptionCaught(ctx, cause);
--- End diff --
LOG.warn("{} exceptionCaught", ctx, cause) you have one {} but 2 variables here.
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2741#discussion_r198638417
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
@@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
- if (!saslNettyClient.isComplete()) {
+ if (!saslNettyClient.isComplete()) {
String errorMessage =
"Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
LOG.error(errorMessage);
throw new Exception(errorMessage);
- }
+ }
ctx.pipeline().remove(this);
this.client.channelReady(channel);
// We call fireChannelRead since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
ctx.fireChannelRead(controlMessage);
- } else {
+ } else {
LOG.warn("Unexpected control message: {}", controlMessage);
- }
+ }
}
private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
- LOG.debug("Responding to server's token of length: {}",
- saslMessageToken.getSaslToken().length);
-
- // Generate SASL response (but we only actually send the response if
- // it's non-null.
- byte[] responseToServer = saslNettyClient
- .saslResponse(saslMessageToken);
- if (responseToServer == null) {
- // If we generate a null response, then authentication has completed
- // (if not, warn), and return without sending a response back to the
- // server.
- LOG.debug("Response to server is null: authentication should now be complete.");
- if (!saslNettyClient.isComplete()) {
- LOG.warn("Generated a null response, but authentication is not complete.");
- throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
- }
- this.client.channelReady(channel);
- } else {
- LOG.debug("Response to server token has length: {}",
- responseToServer.length);
- // Construct a message containing the SASL response and send it to the
+ LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
// server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady(channel);
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ // Construct a message containing the SASL response and send it to the server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
channel.writeAndFlush(saslResponse, channel.voidPromise());
}
}
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ LOG.info("channelRegistered {}", ctx);
+ }
--- End diff --
These look like they are for debugging. could we make them debug logs instead?
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2741#discussion_r199193610
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/KerberosSaslClientHandler.java ---
@@ -96,53 +96,69 @@ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage cont
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
- if (!saslNettyClient.isComplete()) {
+ if (!saslNettyClient.isComplete()) {
String errorMessage =
"Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
LOG.error(errorMessage);
throw new Exception(errorMessage);
- }
+ }
ctx.pipeline().remove(this);
this.client.channelReady(channel);
// We call fireChannelRead since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
ctx.fireChannelRead(controlMessage);
- } else {
+ } else {
LOG.warn("Unexpected control message: {}", controlMessage);
- }
+ }
}
private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
Channel channel = ctx.channel();
KerberosSaslNettyClient saslNettyClient = getChannelSaslClient(channel);
- LOG.debug("Responding to server's token of length: {}",
- saslMessageToken.getSaslToken().length);
-
- // Generate SASL response (but we only actually send the response if
- // it's non-null.
- byte[] responseToServer = saslNettyClient
- .saslResponse(saslMessageToken);
- if (responseToServer == null) {
- // If we generate a null response, then authentication has completed
- // (if not, warn), and return without sending a response back to the
- // server.
- LOG.debug("Response to server is null: authentication should now be complete.");
- if (!saslNettyClient.isComplete()) {
- LOG.warn("Generated a null response, but authentication is not complete.");
- throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
- }
- this.client.channelReady(channel);
- } else {
- LOG.debug("Response to server token has length: {}",
- responseToServer.length);
- // Construct a message containing the SASL response and send it to the
+ LOG.debug("Responding to server's token of length: {}", saslMessageToken.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient.saslResponse(saslMessageToken);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
// server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady(channel);
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ // Construct a message containing the SASL response and send it to the server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
channel.writeAndFlush(saslResponse, channel.voidPromise());
}
}
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ super.channelRegistered(ctx);
+ LOG.debug("channelRegistered {}", ctx);
--- End diff --
Thanks for explaining. Happy you solved it.
---
[GitHub] storm pull request #2741: STORM-3124 reconnect to pacemaker on failure
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/storm/pull/2741
---
[GitHub] storm issue #2741: STORM-3124 reconnect to pacemaker on failure
Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:
https://github.com/apache/storm/pull/2741
@revans2 - made the changes you wanted.
---