You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/01/25 20:45:23 UTC

[geode] branch feature/GEODE-2113e updated: fixed race condition between Sender Handshake and Sender Thread

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-2113e
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-2113e by this push:
     new 9331843  fixed race condition between Sender Handshake and Sender Thread
9331843 is described below

commit 93318439b0d893e17eb996eb520569e1c62fe79b
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Jan 25 12:44:33 2019 -0800

    fixed race condition between Sender Handshake and Sender Thread
---
 .../monitoring/ThreadsMonitoringProcess.java       |  3 +-
 .../apache/geode/internal/net/NioPlainEngine.java  | 39 +++++++--
 .../org/apache/geode/internal/tcp/Connection.java  | 95 +++++++++++++---------
 .../org/apache/geode/internal/tcp/MsgReader.java   | 79 +++++++++++++++---
 4 files changed, 158 insertions(+), 58 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
index ebd4ce1..d6b3344 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
@@ -60,7 +60,8 @@ public class ThreadsMonitoringProcess extends TimerTask {
       if (delta >= this.timeLimit) {
         isStuck = true;
         numOfStuck++;
-        logger.warn("Thread <{}> is stuck", entry1.getKey());
+        logger.warn("Thread {} (0x{}) is stuck", entry1.getKey(),
+            Long.toHexString(entry1.getKey()));
         entry1.getValue().handleExpiry(delta);
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 85fab7f..1e83c40 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -23,6 +23,7 @@ import java.nio.channels.SocketChannel;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.logging.LogService;
 
 /**
@@ -81,21 +82,49 @@ public class NioPlainEngine implements NioFilter {
       DMStats stats) throws IOException {
     ByteBuffer buffer = wrappedBuffer;
 
-    while (lastReadPosition - lastProcessedPosition < bytes) {
-      buffer.limit(buffer.capacity());
-      buffer.position(lastReadPosition);
+    // logger.info("BRUCE: readAtLeast({}) lastProcessedPosition={} lastReadPosition={} buffer
+    // capacity={}",
+    // bytes, lastProcessedPosition, lastReadPosition, buffer.capacity());
+
+    Assert.assertTrue(buffer.capacity() - lastProcessedPosition >= bytes);
+
+    // read into the buffer starting at the end of valid data
+    buffer.limit(buffer.capacity());
+    buffer.position(lastReadPosition);
+
+    while (buffer.position() < (lastProcessedPosition + bytes)) {
       int amountRead = channel.read(buffer);
       if (amountRead < 0) {
         throw new EOFException();
       }
-      lastReadPosition = buffer.position();
     }
+
+    // keep track of how much of the buffer contains valid data with lastReadPosition
+    lastReadPosition = buffer.position();
+
+    // set up the buffer for reading and keep track of how much has been consumed with
+    // lastProcessedPosition
     buffer.limit(lastProcessedPosition + bytes);
     buffer.position(lastProcessedPosition);
-    lastProcessedPosition = buffer.limit();
+    lastProcessedPosition += bytes;
+
+    // logger.info("BRUCE: readAtLeast new lastProcessedPosition={} lastReadPosition={}",
+    // lastProcessedPosition, lastReadPosition);
+
     return buffer;
   }
 
+  public void doneReading(ByteBuffer unwrappedBuffer) {
+    // logger.info("BRUCE: nioFilter is compacting {}",
+    // Integer.toHexString(System.identityHashCode(unwrappedBuffer)));
+    if (unwrappedBuffer.position() != 0) {
+      unwrappedBuffer.compact();
+    } else {
+      unwrappedBuffer.position(unwrappedBuffer.limit());
+      unwrappedBuffer.limit(unwrappedBuffer.capacity());
+    }
+  }
+
   @Override
   public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
     return wrappedBuffer;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 17b5243..2f6c5ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -748,6 +748,7 @@ public class Connection implements Runnable {
   }
 
   private void notifyHandshakeWaiter(boolean success) {
+    // logger.info("BRUCE: notifyHandshakeWaiter invoked", new Exception("stack trace"));
     synchronized (this.handshakeSync) {
       if (success) {
         this.handshakeRead = true;
@@ -1581,6 +1582,7 @@ public class Connection implements Runnable {
       // make sure that if the reader thread exits we notify a thread waiting
       // for the handshake.
       // see bug 37524 for an example of listeners hung in waitForHandshake
+      // logger.info("BRUCE: run() invoking notifyHandshakeWaiter(false)");
       notifyHandshakeWaiter(false);
       this.readerThread.setName("unused p2p reader");
       synchronized (this.stateLock) {
@@ -1732,6 +1734,7 @@ public class Connection implements Runnable {
                 logger.debug("handshake has been cancelled {}", this);
               }
             }
+            // logger.info("BRUCE: handshake thread setting isHandshakeReader=true");
             isHandShakeReader = true;
             // Once we have read the handshake the reader can go away
             break;
@@ -2905,6 +2908,11 @@ public class Connection implements Runnable {
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+
+    // logger.info("BRUCE: processInputBuffer hash={} postion={} limit={}",
+    // Integer.toHexString(System.identityHashCode(inputBuffer)), inputBuffer.position(),
+    // inputBuffer.limit());
+
     inputBuffer.flip();
 
     ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
@@ -2938,10 +2946,10 @@ public class Connection implements Runnable {
             ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
             DataInputStream dis = new DataInputStream(bbis);
             if (!this.isReceiver) {
-              if (readHandshakeForSender(dis)) {
-                ioFilter.doneReading(peerDataBuffer);
-                return;
-              }
+              // we read the handshake and then stop processing since we don't want
+              // to process the input buffer anymore in a handshake thread
+              readHandshakeForSender(dis, peerDataBuffer);
+              return;
             } else {
               if (readHandshakeForReceiver(dis)) {
                 ioFilter.doneReading(peerDataBuffer);
@@ -2964,6 +2972,7 @@ public class Connection implements Runnable {
           }
         }
       } else {
+        // logger.info("BRUCE: processInputBuffer invoking doneReading");
         ioFilter.doneReading(peerDataBuffer);
         done = true;
       }
@@ -3277,31 +3286,55 @@ public class Connection implements Runnable {
     }
   }
 
-  private boolean readHandshakeForSender(DataInputStream dis) {
+  private void readHandshakeForSender(DataInputStream dis, ByteBuffer peerDataBuffer) {
     try {
       this.replyCode = dis.readUnsignedByte();
-      if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
-        this.asyncDistributionTimeout = dis.readInt();
-        this.asyncQueueTimeout = dis.readInt();
-        this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
-        if (this.asyncDistributionTimeout != 0) {
-          logger.info("{} async configuration received {}.",
-              p2pReaderName(),
-              " asyncDistributionTimeout=" + this.asyncDistributionTimeout
-                  + " asyncQueueTimeout=" + this.asyncQueueTimeout
-                  + " asyncMaxQueueSize="
-                  + (this.asyncMaxQueueSize / (1024 * 1024)));
-        }
-        // read the product version ordinal for on-the-fly serialization
-        // transformations (for rolling upgrades)
-        this.remoteVersion = Version.readVersion(dis, true);
+      // logger.info("BRUCE: readHandshakeForSender read replyCode {}", replyCode);
+      switch (replyCode) {
+        case REPLY_CODE_OK:
+          // logger.info("BRUCE: notifying handshake waitier - reply_code_ok");
+          ioFilter.doneReading(peerDataBuffer);
+          notifyHandshakeWaiter(true);
+          return;
+        case REPLY_CODE_OK_WITH_ASYNC_INFO:
+          this.asyncDistributionTimeout = dis.readInt();
+          this.asyncQueueTimeout = dis.readInt();
+          this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
+          if (this.asyncDistributionTimeout != 0) {
+            logger.info("{} async configuration received {}.",
+                p2pReaderName(),
+                " asyncDistributionTimeout=" + this.asyncDistributionTimeout
+                    + " asyncQueueTimeout=" + this.asyncQueueTimeout
+                    + " asyncMaxQueueSize="
+                    + (this.asyncMaxQueueSize / (1024 * 1024)));
+          }
+          // read the product version ordinal for on-the-fly serialization
+          // transformations (for rolling upgrades)
+          this.remoteVersion = Version.readVersion(dis, true);
+          ioFilter.doneReading(peerDataBuffer);
+          notifyHandshakeWaiter(true);
+          return;
+        default:
+          String err =
+              "Unknown handshake reply code: %s nioMessageLength: %s";
+          Object[] errArgs = new Object[] {this.replyCode,
+              messageLength};
+          if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+            logger.debug(
+                String.format(err, errArgs) + " (peer probably departed ungracefully)");
+          } else {
+            logger.fatal(err, errArgs);
+          }
+          this.readerShuttingDown = true;
+          requestClose(String.format(err, errArgs));
+          return;
       }
     } catch (Exception e) {
       this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
       logger.fatal("Error deserializing P2P handshake reply", e);
       this.readerShuttingDown = true;
       requestClose("Error deserializing P2P handshake reply");
-      return true;
+      return;
     } catch (ThreadDeath td) {
       throw td;
     } catch (VirtualMachineError err) {
@@ -3320,26 +3353,8 @@ public class Connection implements Runnable {
           t);
       this.readerShuttingDown = true;
       requestClose("Throwable deserializing P2P handshake reply");
-      return true;
-    }
-    if (this.replyCode != REPLY_CODE_OK
-        && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
-      String err =
-          "Unknown handshake reply code: %s nioMessageLength: %s";
-      Object[] errArgs = new Object[] {this.replyCode,
-          messageLength};
-      if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
-        logger.debug(
-            String.format(err, errArgs) + " (peer probably departed ungracefully)");
-      } else {
-        logger.fatal(err, errArgs);
-      }
-      this.readerShuttingDown = true;
-      requestClose(String.format(err, errArgs));
-      return true;
+      return;
     }
-    notifyHandshakeWaiter(true);
-    return false;
   }
 
   private void setThreadName(int dominoNumber) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 6470203..15a6389 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -15,14 +15,18 @@
 package org.apache.geode.internal.tcp;
 
 import java.io.IOException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.Buffers;
 import org.apache.geode.internal.net.NioFilter;
 
@@ -32,6 +36,8 @@ import org.apache.geode.internal.net.NioFilter;
  *
  */
 public class MsgReader {
+  private static final Logger logger = LogService.getLogger();
+
   protected final Connection conn;
   protected final Header header = new Header();
   private final NioFilter ioFilter;
@@ -44,8 +50,10 @@ public class MsgReader {
     this.conn = conn;
     this.ioFilter = nioFilter;
     this.peerNetData = peerNetData;
-    ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
-    buffer.position(0).limit(0);
+    if (conn.getConduit().useSSL()) {
+      ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
+      buffer.position(0).limit(0);
+    }
     this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
@@ -53,19 +61,36 @@ public class MsgReader {
   Header readHeader() throws IOException {
     ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
 
-    int nioMessageLength = unwrappedBuffer.getInt();
-    /* nioMessageVersion = */ Connection.calcHdrVersion(nioMessageLength);
-    nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-    byte nioMessageType = unwrappedBuffer.get();
-    short nioMsgId = unwrappedBuffer.getShort();
+    // logger.info("BRUCE: MsgReader.readHeader buffer position {} limit {}",
+    // unwrappedBuffer.position(), unwrappedBuffer.limit());
+    Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
+
+    int position = unwrappedBuffer.position();
+    int limit = unwrappedBuffer.limit();
 
-    boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
-    if (directAck) {
-      nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
+    try {
+      int nioMessageLength = unwrappedBuffer.getInt();
+      /* nioMessageVersion = */
+      Connection.calcHdrVersion(nioMessageLength);
+      nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
+      byte nioMessageType = unwrappedBuffer.get();
+      short nioMsgId = unwrappedBuffer.getShort();
+
+      boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
+      if (directAck) {
+        nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
+      }
+
+      header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+
+      return header;
+    } catch (BufferUnderflowException e) {
+      dumpState("BufferUnderflowException", e, unwrappedBuffer, position, limit);
+      throw e;
+    } finally {
+      dumpState("readHeader", null, unwrappedBuffer, position, limit);
     }
 
-    header.setFields(nioMessageLength, nioMessageType, nioMsgId);
-    return header;
   }
 
   /**
@@ -79,10 +104,19 @@ public class MsgReader {
     Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
     this.getStats().incMessagesBeingReceived(true, header.messageLength);
     long startSer = this.getStats().startMsgDeserialization();
+    int position = nioInputBuffer.position();
+    int limit = nioInputBuffer.limit();
     try {
       byteBufferInputStream.setBuffer(nioInputBuffer);
       ReplyProcessor21.initMessageRPId();
+//      dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit);
       return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
+    } catch (RuntimeException e) {
+      dumpState("readMessage(1)", e, nioInputBuffer, position, limit);
+      throw e;
+    } catch (IOException e) {
+      dumpState("readMessage(2)", e, nioInputBuffer, position, limit);
+      throw e;
     } finally {
       this.getStats().endMsgDeserialization(startSer);
       this.getStats().decMessagesBeingReceived(header.messageLength);
@@ -90,6 +124,20 @@ public class MsgReader {
     }
   }
 
+  private void dumpState(String whereFrom, Throwable e, ByteBuffer inputBuffer, int position,
+      int limit) {
+     logger.info("BRUCE: {}, Connection to {}", whereFrom, conn.getRemoteAddress());
+     logger.info("BRUCE: {}, Message length {}; type {}; id {}",
+     whereFrom, header.messageLength, header.messageId, header.messageId);
+     logger.info("BRUCE: {}, starting buffer position {}; buffer limit {} buffer hash {}",
+     whereFrom, position, limit, Integer.toHexString(System.identityHashCode(inputBuffer)));
+     logger.info("BRUCE: {}, current buffer position {}; buffer limit {}",
+     whereFrom, inputBuffer.position(), inputBuffer.limit());
+     if (e != null) {
+     logger.info("BRUCE: Exception reading message", e);
+     }
+  }
+
   void readChunk(Header header, MsgDestreamer md)
       throws IOException {
     ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
@@ -102,8 +150,15 @@ public class MsgReader {
 
 
   private ByteBuffer readAtLeast(int bytes) throws IOException {
+    // logger.info("BRUCE: ensureWrappedCapacity need {} buffer {} position {} limit {} capacity
+    // {}", bytes,
+    // Integer.toHexString(System.identityHashCode(peerNetData)), peerNetData.position(),
+    // peerNetData.limit(), peerNetData.capacity());
     peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
         Buffers.BufferType.TRACKED_RECEIVER, getStats());
+    // logger.info("BRUCE: result buffer {} position {} limit {} capacity {}",
+    // Integer.toHexString(System.identityHashCode(peerNetData)), peerNetData.position(),
+    // peerNetData.limit(), peerNetData.capacity());
     return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
   }