You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/02/22 16:49:34 UTC
[geode] branch develop updated: GEODE-6389 CI Failure:
ConcurrentWANPropagation_1_DUnitTest.testReplicatedSerialPropagation_withoutRemoteSite
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 71dacf6 GEODE-6389 CI Failure: ConcurrentWANPropagation_1_DUnitTest.testReplicatedSerialPropagation_withoutRemoteSite
71dacf6 is described below
commit 71dacf6a6f8de535c07be4584ee3d054a41b10e3
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Feb 22 08:47:44 2019 -0800
GEODE-6389 CI Failure: ConcurrentWANPropagation_1_DUnitTest.testReplicatedSerialPropagation_withoutRemoteSite
This removes the sharing of buffers between threads in Connection.java
and MsgReader.java. The Buffers buffer pooling mechanism isn't up to
handling sharing of buffers between threads and I determined that it
wasn't really necessary to retain the handshake buffer for follow-up TLS
communications.
ClusterCommunicationsDUnitTest already covers testing of this change.
This closes #3209
---
.../org/apache/geode/internal/net/NioPlainEngine.java | 7 ++++++-
.../org/apache/geode/internal/net/NioSslEngine.java | 3 +++
.../org/apache/geode/internal/tcp/Connection.java | 19 ++++++++++++++++---
.../java/org/apache/geode/internal/tcp/MsgReader.java | 13 +++++++------
4 files changed, 32 insertions(+), 10 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index bf868f2..2c901e6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -55,7 +55,12 @@ public class NioPlainEngine implements NioFilter {
Buffers.BufferType bufferType, DMStats stats) {
ByteBuffer buffer = wrappedBuffer;
- if (buffer.capacity() > amount) {
+ if (buffer == null) {
+ buffer = Buffers.acquireBuffer(bufferType, amount, stats);
+ buffer.clear();
+ lastProcessedPosition = 0;
+ lastReadPosition = 0;
+ } else if (buffer.capacity() > amount) {
// we already have a buffer that's big enough
if (buffer.capacity() - lastProcessedPosition < amount) {
buffer.limit(lastReadPosition);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 6e81303..f2e0e36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -316,6 +316,9 @@ public class NioSslEngine implements NioFilter {
@Override
public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer,
Buffers.BufferType bufferType, DMStats stats) {
+ if (wrappedBuffer == null) {
+ wrappedBuffer = Buffers.acquireBuffer(bufferType, amount, stats);
+ }
return wrappedBuffer;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 7fcbee5..e659496 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -779,8 +779,6 @@ public class Connection implements Runnable {
// we do the close in a background thread because the operation may hang if
// there is a problem with the network. See bug #46659
- releaseInputBuffer();
-
// if simulating sickness, sockets must be closed in-line so that tests know
// that the vm is sick when the beSick operation completes
if (beingSickForTests) {
@@ -1448,6 +1446,11 @@ public class Connection implements Runnable {
}
// make sure our socket is closed
asyncClose(false);
+ if (!this.isReceiver) {
+ // receivers release the input buffer when exiting run(). Senders use the
+ // inputBuffer for reading direct-reply responses
+ releaseInputBuffer();
+ }
lengthSet = false;
} // synchronized
@@ -1585,7 +1588,14 @@ public class Connection implements Runnable {
}
asyncClose(false);
this.owner.removeAndCloseThreadOwnedSockets();
+ } else {
+ if (getConduit().useSSL()) {
+ ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
+ buffer.position(0).limit(0);
+ }
}
+ releaseInputBuffer();
+
// make sure that if the reader thread exits we notify a thread waiting
// for the handshake.
// see bug 37524 for an example of listeners hung in waitForHandshake
@@ -2828,7 +2838,7 @@ public class Connection implements Runnable {
DMStats stats = owner.getConduit().getStats();
final Version version = getRemoteVersion();
try {
- msgReader = new MsgReader(this, ioFilter, getInputBuffer(), version);
+ msgReader = new MsgReader(this, ioFilter, version);
Header header = msgReader.readHeader();
@@ -2903,6 +2913,9 @@ public class Connection implements Runnable {
getRemoteAddress());
this.ackTimedOut = false;
}
+ if (msgReader != null) {
+ msgReader.close();
+ }
}
synchronized (stateLock) {
this.connectionState = STATE_RECEIVED_ACK;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index adf0305..afb0272 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -46,14 +46,9 @@ public class MsgReader {
- MsgReader(Connection conn, NioFilter nioFilter, ByteBuffer peerNetData, Version version) {
+ MsgReader(Connection conn, NioFilter nioFilter, Version version) {
this.conn = conn;
this.ioFilter = nioFilter;
- this.peerNetData = peerNetData;
- if (conn.getConduit().useSSL()) {
- ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
- buffer.position(0).limit(0);
- }
this.byteBufferInputStream =
version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
}
@@ -134,6 +129,12 @@ public class MsgReader {
return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
}
+ public void close() {
+ if (peerNetData != null) {
+ Buffers.releaseReceiveBuffer(peerNetData, getStats());
+ }
+ }
+
private DMStats getStats() {