You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/09/05 22:26:32 UTC

kudu git commit: KUDU-2130: java client: handle termination during negotiation edge case

Repository: kudu
Updated Branches:
  refs/heads/master 5583d6200 -> f0aa3b3f1


KUDU-2130: java client: handle termination during negotiation edge case

There was an edge case where a Connection can be terminated while negotiation
is completing. This would result in a scary looking log message:

  18:24:07.776 [DEBUG - New I/O worker #8112] (Connection.java:649) [peer master-127.32.133.1:64032] cleaning up while in state NEGOTIATING due to: connection disconnected
  18:24:07.781 [ERROR - New I/O worker #8112] (Connection.java:418) [peer master-127.32.133.1:64032] unexpected exception from downstream on [id: 0xdd52bacc, /127.0.0.1:55318 :> /127.32.133.1:64032]
  java.lang.NullPointerException
     at org.apache.kudu.client.Connection.messageReceived(Connection.java:271)
      at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
      at org.apache.kudu.client.Connection.handleUpstream(Connection.java:236)
      at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
      at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)

but in reality the error message is harmless; it just indicates that the
connection has been terminated while the connection's messageReceived handler
is clearing the message queue. This interruption is possible because of
82a8e9f99, which fixed a deadlock in Connection. This commit recognizes when
this race has occured, and early exits from messageReceived.

Change-Id: I3e9d4a6535ae82973743e4ac1071de0aac92b08b
Reviewed-on: http://gerrit.cloudera.org:8080/7960
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f0aa3b3f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f0aa3b3f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f0aa3b3f

Branch: refs/heads/master
Commit: f0aa3b3f194146760597e6ab88c304c6f408073c
Parents: 5583d62
Author: Dan Burkert <da...@apache.org>
Authored: Tue Sep 5 13:39:06 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Sep 5 22:25:35 2017 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/kudu/client/Connection.java   | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f0aa3b3f/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
index f21d2bb..8b1febb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -261,14 +261,14 @@ class Connection extends SimpleChannelUpstreamHandler {
     if (m instanceof Negotiator.Success) {
       lock.lock();
       try {
-        Preconditions.checkState(state == State.NEGOTIATING);
-        Preconditions.checkState(inflightMessages.isEmpty());
         negotiationResult = (Negotiator.Success) m;
+        Preconditions.checkState(state == State.TERMINATED || inflightMessages.isEmpty());
 
         // Before switching to the READY state, it's necessary to empty the queuedMessages. There
         // might be concurrent activity on adding new messages into the queue if enqueueMessage()
         // is called in the middle.
-        while (!queuedMessages.isEmpty()) {
+        while (state != State.TERMINATED && !queuedMessages.isEmpty()) {
+
           // Register the messages into the inflightMessages before sending them to the wire. This
           // is to be able to invoke appropriate callback when the response received. This should
           // be done under the lock since the inflightMessages itself does not provide any
@@ -292,8 +292,13 @@ class Connection extends SimpleChannelUpstreamHandler {
             lock.lock();
           }
         }
+        // The connection may have been terminated while the lock was dropped.
+        if (state == State.TERMINATED) {
+          return;
+        }
+
+        Preconditions.checkState(state == State.NEGOTIATING);
 
-        assert queuedMessages.isEmpty();
         queuedMessages = null;
         // Set the state to READY -- that means the incoming messages should be no longer put into
         // the queuedMessages, but sent to wire right away (see the enqueueMessage() for details).