You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/02/22 16:49:34 UTC

[geode] branch develop updated: GEODE-6389 CI Failure: ConcurrentWANPropagation_1_DUnitTest.testReplicatedSerialPropagation_withoutRemoteSite

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 71dacf6  GEODE-6389 CI Failure: ConcurrentWANPropagation_1_DUnitTest.testReplicatedSerialPropagation_withoutRemoteSite
71dacf6 is described below

commit 71dacf6a6f8de535c07be4584ee3d054a41b10e3
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Feb 22 08:47:44 2019 -0800

    GEODE-6389 CI Failure: ConcurrentWANPropagation_1_DUnitTest.testReplicatedSerialPropagation_withoutRemoteSite
    
    This removes the sharing of buffers between threads in Connection.java
    and MsgReader.java.  The Buffers buffer pooling mechanism isn't up to
    handling sharing of buffers between threads and I determined that it
    wasn't really necessary to retain the handshake buffer for follow-up TLS
    communications.
    
    ClusterCommunicationsDUnitTest already covers testing of this change.
    
    This closes #3209
---
 .../org/apache/geode/internal/net/NioPlainEngine.java |  7 ++++++-
 .../org/apache/geode/internal/net/NioSslEngine.java   |  3 +++
 .../org/apache/geode/internal/tcp/Connection.java     | 19 ++++++++++++++++---
 .../java/org/apache/geode/internal/tcp/MsgReader.java | 13 +++++++------
 4 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index bf868f2..2c901e6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -55,7 +55,12 @@ public class NioPlainEngine implements NioFilter {
       Buffers.BufferType bufferType, DMStats stats) {
     ByteBuffer buffer = wrappedBuffer;
 
-    if (buffer.capacity() > amount) {
+    if (buffer == null) {
+      buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+      buffer.clear();
+      lastProcessedPosition = 0;
+      lastReadPosition = 0;
+    } else if (buffer.capacity() > amount) {
       // we already have a buffer that's big enough
       if (buffer.capacity() - lastProcessedPosition < amount) {
         buffer.limit(lastReadPosition);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 6e81303..f2e0e36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -316,6 +316,9 @@ public class NioSslEngine implements NioFilter {
   @Override
   public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
       Buffers.BufferType bufferType, DMStats stats) {
+    if (wrappedBuffer == null) {
+      wrappedBuffer = Buffers.acquireBuffer(bufferType, amount, stats);
+    }
     return wrappedBuffer;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 7fcbee5..e659496 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -779,8 +779,6 @@ public class Connection implements Runnable {
     // we do the close in a background thread because the operation may hang if
     // there is a problem with the network. See bug #46659
 
-    releaseInputBuffer();
-
     // if simulating sickness, sockets must be closed in-line so that tests know
     // that the vm is sick when the beSick operation completes
     if (beingSickForTests) {
@@ -1448,6 +1446,11 @@ public class Connection implements Runnable {
         }
         // make sure our socket is closed
         asyncClose(false);
+        if (!this.isReceiver) {
+          // receivers release the input buffer when exiting run(). Senders use the
+          // inputBuffer for reading direct-reply responses
+          releaseInputBuffer();
+        }
         lengthSet = false;
       } // synchronized
 
@@ -1585,7 +1588,14 @@ public class Connection implements Runnable {
         }
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
+      } else {
+        if (getConduit().useSSL()) {
+          ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
+          buffer.position(0).limit(0);
+        }
       }
+      releaseInputBuffer();
+
       // make sure that if the reader thread exits we notify a thread waiting
       // for the handshake.
       // see bug 37524 for an example of listeners hung in waitForHandshake
@@ -2828,7 +2838,7 @@ public class Connection implements Runnable {
     DMStats stats = owner.getConduit().getStats();
     final Version version = getRemoteVersion();
     try {
-      msgReader = new MsgReader(this, ioFilter, getInputBuffer(), version);
+      msgReader = new MsgReader(this, ioFilter, version);
 
       Header header = msgReader.readHeader();
 
@@ -2903,6 +2913,9 @@ public class Connection implements Runnable {
             getRemoteAddress());
         this.ackTimedOut = false;
       }
+      if (msgReader != null) {
+        msgReader.close();
+      }
     }
     synchronized (stateLock) {
       this.connectionState = STATE_RECEIVED_ACK;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index adf0305..afb0272 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -46,14 +46,9 @@ public class MsgReader {
 
 
 
-  MsgReader(Connection conn, NioFilter nioFilter, ByteBuffer peerNetData, Version version) {
+  MsgReader(Connection conn, NioFilter nioFilter, Version version) {
     this.conn = conn;
     this.ioFilter = nioFilter;
-    this.peerNetData = peerNetData;
-    if (conn.getConduit().useSSL()) {
-      ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
-      buffer.position(0).limit(0);
-    }
     this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
   }
@@ -134,6 +129,12 @@ public class MsgReader {
     return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
   }
 
+  public void close() {
+    if (peerNetData != null) {
+      Buffers.releaseReceiveBuffer(peerNetData, getStats());
+    }
+  }
+
 
 
   private DMStats getStats() {