You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/29 15:55:55 UTC

[geode] branch feature/GEODE-8020b updated: fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-8020b
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-8020b by this push:
     new d9c3ed4  fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer
d9c3ed4 is described below

commit d9c3ed4529994917112a9f21af0639815d36bca2
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Apr 29 08:54:27 2020 -0700

    fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer
    
    the test doesn't have multiple versions, so I think this TODO is relevant
    to the SSL decryption exceptions we're seeing (GEODE-8020).
    
    [warn 2020/04/28 14:22:11.075 PDT <vm_0_thr_4_bridge1_host1_12580> tid=0x86] BRUCE:
    java.lang.Exception: stack trace
      at org.apache.geode.internal.net.BufferPool.acquireDirectBuffer(BufferPool.java:115)
      at org.apache.geode.internal.net.BufferPool.acquireDirectSenderBuffer(BufferPool.java:64)
      at org.apache.geode.internal.tcp.MsgStreamer.<init>(MsgStreamer.java:132)
      at org.apache.geode.internal.tcp.VersionedMsgStreamer.<init>(VersionedMsgStreamer.java:37)
      at org.apache.geode.internal.tcp.MsgStreamer.create(MsgStreamer.java:199)
---
 .../org/apache/geode/internal/net/BufferPool.java  |  9 +++++++
 .../org/apache/geode/internal/tcp/Connection.java  | 10 +++++++-
 .../org/apache/geode/internal/tcp/MsgStreamer.java |  2 +-
 .../apache/geode/internal/tcp/MsgStreamerList.java | 29 ++++++++--------------
 4 files changed, 29 insertions(+), 21 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 0997c6e..80a09a7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,11 +19,15 @@ import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.logging.internal.log4j.api.LogService;
 
 public class BufferPool {
   private final DMStats stats;
+  private static final Logger logger = LogService.getLogger();
 
   /**
    * Buffers may be acquired from the Buffers pool
@@ -88,6 +92,8 @@ public class BufferPool {
         } else if (bb.capacity() >= size) {
           bb.rewind();
           bb.limit(size);
+//          logger.warn("BRUCE: acquiring pooled buffer {} hash {}", bb, Integer.toHexString(System.identityHashCode(bb)));
+
           return bb;
         } else {
           // wasn't big enough so put it back in the queue
@@ -105,6 +111,8 @@ public class BufferPool {
         ref = bufferQueue.poll();
       }
       result = ByteBuffer.allocateDirect(size);
+//      logger.warn("BRUCE: allocating new pooled buffer {} hash {}", result, Integer.toHexString(System.identityHashCode(result)));
+//      logger.warn("BRUCE: ", new Exception("stack trace"));
     } else {
       // if we are using heap buffers then don't bother with keeping them around
       result = ByteBuffer.allocate(size);
@@ -228,6 +236,7 @@ public class BufferPool {
   private void releaseBuffer(ByteBuffer bb, boolean send) {
     if (bb.isDirect()) {
       BBSoftReference bbRef = new BBSoftReference(bb, send);
+//      logger.warn("BRUCE: releasing pooled buffer {} hash {}", bb, Integer.toHexString(System.identityHashCode(bb)));
       bufferQueue.offer(bbRef);
     } else {
       if (send) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 719f48e..1723e09 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2725,7 +2725,15 @@ public class Connection implements Runnable {
   private void processInputBuffer() throws ConnectionException, IOException {
     inputBuffer.flip();
 
-    ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+    ByteBuffer peerDataBuffer;
+    try {
+      peerDataBuffer = ioFilter.unwrap(inputBuffer);
+    } catch (SSLException e) {
+//      if (e.getMessage().contains("bad record MAC")) {
+//        logger.warn("BRUCE: exception unwrapping buffer {} hash {}", inputBuffer, Integer.toHexString(System.identityHashCode(inputBuffer)));
+//      }
+      throw e;
+    }
     peerDataBuffer.flip();
 
     boolean done = false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 9783397..1c3a2b5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -155,7 +155,7 @@ public class MsgStreamer extends OutputStream
       int numVersioned = 0;
       for (Object c : cons) {
         con = (Connection) c;
-        if ((version = con.getRemoteVersion()) != null) {
+        if ((version = con.getRemoteVersion()) != null && Version.CURRENT_ORDINAL > version.ordinal()) {
           if (versionToConnMap == null) {
             versionToConnMap = new Object2ObjectOpenHashMap();
           }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
index 3d2446c..08b573c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
@@ -63,25 +63,16 @@ public class MsgStreamerList implements BaseMsgStreamer {
     for (MsgStreamer streamer : this.streamers) {
       if (ex != null) {
         streamer.release();
-        // TODO: shouldn't we call continue here?
-        // It seems wrong to call writeMessage on a streamer we have just released.
-        // But why do we call release on a streamer when we had an exception on one
-        // of the previous streamer?
-        // release clears the direct bb and returns it to the pool but leaves
-        // it has the "buffer". THen we call writeMessage and it will use "buffer"
-        // that has also been returned to the pool.
-        // I think we only have a MsgStreamerList when a DS has a mix of versions
-        // which usually is just during a rolling upgrade so that might be why we
-        // haven't noticed this causing a bug.
-      }
-      try {
-        result += streamer.writeMessage();
-        // if there is an exception we need to finish the
-        // loop and release the other streamer's buffers
-      } catch (RuntimeException e) {
-        ex = e;
-      } catch (IOException e) {
-        ioex = e;
+      } else {
+        try {
+          result += streamer.writeMessage();
+          // if there is an exception we need to finish the
+          // loop and release the other streamer's buffers
+        } catch (RuntimeException e) {
+          ex = e;
+        } catch (IOException e) {
+          ioex = e;
+        }
       }
     }
     if (ex != null) {