You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2020/04/29 15:55:55 UTC
[geode] branch feature/GEODE-8020b updated: fixing TODO in
MsgStreamerList after seeing a thread create a VersionedMsgStreamer
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-8020b
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-8020b by this push:
new d9c3ed4 fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer
d9c3ed4 is described below
commit d9c3ed4529994917112a9f21af0639815d36bca2
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed Apr 29 08:54:27 2020 -0700
fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer
the test doesn't have multiple versions, so I think this TODO is relevant
to the SSL decryption exceptions we're seeing (GEODE-8020).
[warn 2020/04/28 14:22:11.075 PDT <vm_0_thr_4_bridge1_host1_12580> tid=0x86] BRUCE:
java.lang.Exception: stack trace
at org.apache.geode.internal.net.BufferPool.acquireDirectBuffer(BufferPool.java:115)
at org.apache.geode.internal.net.BufferPool.acquireDirectSenderBuffer(BufferPool.java:64)
at org.apache.geode.internal.tcp.MsgStreamer.<init>(MsgStreamer.java:132)
at org.apache.geode.internal.tcp.VersionedMsgStreamer.<init>(VersionedMsgStreamer.java:37)
at org.apache.geode.internal.tcp.MsgStreamer.create(MsgStreamer.java:199)
---
.../org/apache/geode/internal/net/BufferPool.java | 9 +++++++
.../org/apache/geode/internal/tcp/Connection.java | 10 +++++++-
.../org/apache/geode/internal/tcp/MsgStreamer.java | 2 +-
.../apache/geode/internal/tcp/MsgStreamerList.java | 29 ++++++++--------------
4 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index 0997c6e..80a09a7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,11 +19,15 @@ import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.internal.Assert;
+import org.apache.geode.logging.internal.log4j.api.LogService;
public class BufferPool {
private final DMStats stats;
+ private static final Logger logger = LogService.getLogger();
/**
* Buffers may be acquired from the Buffers pool
@@ -88,6 +92,8 @@ public class BufferPool {
} else if (bb.capacity() >= size) {
bb.rewind();
bb.limit(size);
+// logger.warn("BRUCE: acquiring pooled buffer {} hash {}", bb, Integer.toHexString(System.identityHashCode(bb)));
+
return bb;
} else {
// wasn't big enough so put it back in the queue
@@ -105,6 +111,8 @@ public class BufferPool {
ref = bufferQueue.poll();
}
result = ByteBuffer.allocateDirect(size);
+// logger.warn("BRUCE: allocating new pooled buffer {} hash {}", result, Integer.toHexString(System.identityHashCode(result)));
+// logger.warn("BRUCE: ", new Exception("stack trace"));
} else {
// if we are using heap buffers then don't bother with keeping them around
result = ByteBuffer.allocate(size);
@@ -228,6 +236,7 @@ public class BufferPool {
private void releaseBuffer(ByteBuffer bb, boolean send) {
if (bb.isDirect()) {
BBSoftReference bbRef = new BBSoftReference(bb, send);
+// logger.warn("BRUCE: releasing pooled buffer {} hash {}", bb, Integer.toHexString(System.identityHashCode(bb)));
bufferQueue.offer(bbRef);
} else {
if (send) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 719f48e..1723e09 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2725,7 +2725,15 @@ public class Connection implements Runnable {
private void processInputBuffer() throws ConnectionException, IOException {
inputBuffer.flip();
- ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
+ ByteBuffer peerDataBuffer;
+ try {
+ peerDataBuffer = ioFilter.unwrap(inputBuffer);
+ } catch (SSLException e) {
+// if (e.getMessage().contains("bad record MAC")) {
+// logger.warn("BRUCE: exception unwrapping buffer {} hash {}", inputBuffer, Integer.toHexString(System.identityHashCode(inputBuffer)));
+// }
+ throw e;
+ }
peerDataBuffer.flip();
boolean done = false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 9783397..1c3a2b5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -155,7 +155,7 @@ public class MsgStreamer extends OutputStream
int numVersioned = 0;
for (Object c : cons) {
con = (Connection) c;
- if ((version = con.getRemoteVersion()) != null) {
+ if ((version = con.getRemoteVersion()) != null && Version.CURRENT_ORDINAL > version.ordinal()) {
if (versionToConnMap == null) {
versionToConnMap = new Object2ObjectOpenHashMap();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
index 3d2446c..08b573c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
@@ -63,25 +63,16 @@ public class MsgStreamerList implements BaseMsgStreamer {
for (MsgStreamer streamer : this.streamers) {
if (ex != null) {
streamer.release();
- // TODO: shouldn't we call continue here?
- // It seems wrong to call writeMessage on a streamer we have just released.
- // But why do we call release on a streamer when we had an exception on one
- // of the previous streamer?
- // release clears the direct bb and returns it to the pool but leaves
- // it has the "buffer". THen we call writeMessage and it will use "buffer"
- // that has also been returned to the pool.
- // I think we only have a MsgStreamerList when a DS has a mix of versions
- // which usually is just during a rolling upgrade so that might be why we
- // haven't noticed this causing a bug.
- }
- try {
- result += streamer.writeMessage();
- // if there is an exception we need to finish the
- // loop and release the other streamer's buffers
- } catch (RuntimeException e) {
- ex = e;
- } catch (IOException e) {
- ioex = e;
+ } else {
+ try {
+ result += streamer.writeMessage();
+ // if there is an exception we need to finish the
+ // loop and release the other streamer's buffers
+ } catch (RuntimeException e) {
+ ex = e;
+ } catch (IOException e) {
+ ioex = e;
+ }
}
}
if (ex != null) {