You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/01/25 20:45:23 UTC
[geode] branch feature/GEODE-2113e updated: fixed race condition
between Sender Handshake and Sender Thread
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-2113e
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-2113e by this push:
new 9331843 fixed race condition between Sender Handshake and Sender Thread
9331843 is described below
commit 93318439b0d893e17eb996eb520569e1c62fe79b
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Jan 25 12:44:33 2019 -0800
fixed race condition between Sender Handshake and Sender Thread
---
.../monitoring/ThreadsMonitoringProcess.java | 3 +-
.../apache/geode/internal/net/NioPlainEngine.java | 39 +++++++--
.../org/apache/geode/internal/tcp/Connection.java | 95 +++++++++++++---------
.../org/apache/geode/internal/tcp/MsgReader.java | 79 +++++++++++++++---
4 files changed, 158 insertions(+), 58 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
index ebd4ce1..d6b3344 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
@@ -60,7 +60,8 @@ public class ThreadsMonitoringProcess extends TimerTask {
if (delta >= this.timeLimit) {
isStuck = true;
numOfStuck++;
- logger.warn("Thread <{}> is stuck", entry1.getKey());
+ logger.warn("Thread {} (0x{}) is stuck", entry1.getKey(),
+ Long.toHexString(entry1.getKey()));
entry1.getValue().handleExpiry(delta);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
index 85fab7f..1e83c40 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
@@ -23,6 +23,7 @@ import java.nio.channels.SocketChannel;
import org.apache.logging.log4j.Logger;
import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.internal.Assert;
import org.apache.geode.internal.logging.LogService;
/**
@@ -81,21 +82,49 @@ public class NioPlainEngine implements NioFilter {
DMStats stats) throws IOException {
ByteBuffer buffer = wrappedBuffer;
- while (lastReadPosition - lastProcessedPosition < bytes) {
- buffer.limit(buffer.capacity());
- buffer.position(lastReadPosition);
+ // logger.info("BRUCE: readAtLeast({}) lastProcessedPosition={} lastReadPosition={} buffer
+ // capacity={}",
+ // bytes, lastProcessedPosition, lastReadPosition, buffer.capacity());
+
+ Assert.assertTrue(buffer.capacity() - lastProcessedPosition >= bytes);
+
+ // read into the buffer starting at the end of valid data
+ buffer.limit(buffer.capacity());
+ buffer.position(lastReadPosition);
+
+ while (buffer.position() < (lastProcessedPosition + bytes)) {
int amountRead = channel.read(buffer);
if (amountRead < 0) {
throw new EOFException();
}
- lastReadPosition = buffer.position();
}
+
+ // keep track of how much of the buffer contains valid data with lastReadPosition
+ lastReadPosition = buffer.position();
+
+ // set up the buffer for reading and keep track of how much has been consumed with
+ // lastProcessedPosition
buffer.limit(lastProcessedPosition + bytes);
buffer.position(lastProcessedPosition);
- lastProcessedPosition = buffer.limit();
+ lastProcessedPosition += bytes;
+
+ // logger.info("BRUCE: readAtLeast new lastProcessedPosition={} lastReadPosition={}",
+ // lastProcessedPosition, lastReadPosition);
+
return buffer;
}
+ public void doneReading(ByteBuffer unwrappedBuffer) {
+ // logger.info("BRUCE: nioFilter is compacting {}",
+ // Integer.toHexString(System.identityHashCode(unwrappedBuffer)));
+ if (unwrappedBuffer.position() != 0) {
+ unwrappedBuffer.compact();
+ } else {
+ unwrappedBuffer.position(unwrappedBuffer.limit());
+ unwrappedBuffer.limit(unwrappedBuffer.capacity());
+ }
+ }
+
@Override
public ByteBuffer getUnwrappedBuffer(ByteBuffer wrappedBuffer) {
return wrappedBuffer;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 17b5243..2f6c5ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -748,6 +748,7 @@ public class Connection implements Runnable {
}
private void notifyHandshakeWaiter(boolean success) {
+ // logger.info("BRUCE: notifyHandshakeWaiter invoked", new Exception("stack trace"));
synchronized (this.handshakeSync) {
if (success) {
this.handshakeRead = true;
@@ -1581,6 +1582,7 @@ public class Connection implements Runnable {
// make sure that if the reader thread exits we notify a thread waiting
// for the handshake.
// see bug 37524 for an example of listeners hung in waitForHandshake
+ // logger.info("BRUCE: run() invoking notifyHandshakeWaiter(false)");
notifyHandshakeWaiter(false);
this.readerThread.setName("unused p2p reader");
synchronized (this.stateLock) {
@@ -1732,6 +1734,7 @@ public class Connection implements Runnable {
logger.debug("handshake has been cancelled {}", this);
}
}
+ // logger.info("BRUCE: handshake thread setting isHandshakeReader=true");
isHandShakeReader = true;
// Once we have read the handshake the reader can go away
break;
@@ -2905,6 +2908,11 @@ public class Connection implements Runnable {
* deserialized and passed to TCPConduit for further processing
*/
private void processInputBuffer() throws ConnectionException, IOException {
+
+ // logger.info("BRUCE: processInputBuffer hash={} postion={} limit={}",
+ // Integer.toHexString(System.identityHashCode(inputBuffer)), inputBuffer.position(),
+ // inputBuffer.limit());
+
inputBuffer.flip();
ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
@@ -2938,10 +2946,10 @@ public class Connection implements Runnable {
ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
DataInputStream dis = new DataInputStream(bbis);
if (!this.isReceiver) {
- if (readHandshakeForSender(dis)) {
- ioFilter.doneReading(peerDataBuffer);
- return;
- }
+ // we read the handshake and then stop processing since we don't want
+ // to process the input buffer anymore in a handshake thread
+ readHandshakeForSender(dis, peerDataBuffer);
+ return;
} else {
if (readHandshakeForReceiver(dis)) {
ioFilter.doneReading(peerDataBuffer);
@@ -2964,6 +2972,7 @@ public class Connection implements Runnable {
}
}
} else {
+ // logger.info("BRUCE: processInputBuffer invoking doneReading");
ioFilter.doneReading(peerDataBuffer);
done = true;
}
@@ -3277,31 +3286,55 @@ public class Connection implements Runnable {
}
}
- private boolean readHandshakeForSender(DataInputStream dis) {
+ private void readHandshakeForSender(DataInputStream dis, ByteBuffer peerDataBuffer) {
try {
this.replyCode = dis.readUnsignedByte();
- if (this.replyCode == REPLY_CODE_OK_WITH_ASYNC_INFO) {
- this.asyncDistributionTimeout = dis.readInt();
- this.asyncQueueTimeout = dis.readInt();
- this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
- if (this.asyncDistributionTimeout != 0) {
- logger.info("{} async configuration received {}.",
- p2pReaderName(),
- " asyncDistributionTimeout=" + this.asyncDistributionTimeout
- + " asyncQueueTimeout=" + this.asyncQueueTimeout
- + " asyncMaxQueueSize="
- + (this.asyncMaxQueueSize / (1024 * 1024)));
- }
- // read the product version ordinal for on-the-fly serialization
- // transformations (for rolling upgrades)
- this.remoteVersion = Version.readVersion(dis, true);
+ // logger.info("BRUCE: readHandshakeForSender read replyCode {}", replyCode);
+ switch (replyCode) {
+ case REPLY_CODE_OK:
+ // logger.info("BRUCE: notifying handshake waitier - reply_code_ok");
+ ioFilter.doneReading(peerDataBuffer);
+ notifyHandshakeWaiter(true);
+ return;
+ case REPLY_CODE_OK_WITH_ASYNC_INFO:
+ this.asyncDistributionTimeout = dis.readInt();
+ this.asyncQueueTimeout = dis.readInt();
+ this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
+ if (this.asyncDistributionTimeout != 0) {
+ logger.info("{} async configuration received {}.",
+ p2pReaderName(),
+ " asyncDistributionTimeout=" + this.asyncDistributionTimeout
+ + " asyncQueueTimeout=" + this.asyncQueueTimeout
+ + " asyncMaxQueueSize="
+ + (this.asyncMaxQueueSize / (1024 * 1024)));
+ }
+ // read the product version ordinal for on-the-fly serialization
+ // transformations (for rolling upgrades)
+ this.remoteVersion = Version.readVersion(dis, true);
+ ioFilter.doneReading(peerDataBuffer);
+ notifyHandshakeWaiter(true);
+ return;
+ default:
+ String err =
+ "Unknown handshake reply code: %s nioMessageLength: %s";
+ Object[] errArgs = new Object[] {this.replyCode,
+ messageLength};
+ if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+ logger.debug(
+ String.format(err, errArgs) + " (peer probably departed ungracefully)");
+ } else {
+ logger.fatal(err, errArgs);
+ }
+ this.readerShuttingDown = true;
+ requestClose(String.format(err, errArgs));
+ return;
}
} catch (Exception e) {
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
logger.fatal("Error deserializing P2P handshake reply", e);
this.readerShuttingDown = true;
requestClose("Error deserializing P2P handshake reply");
- return true;
+ return;
} catch (ThreadDeath td) {
throw td;
} catch (VirtualMachineError err) {
@@ -3320,26 +3353,8 @@ public class Connection implements Runnable {
t);
this.readerShuttingDown = true;
requestClose("Throwable deserializing P2P handshake reply");
- return true;
- }
- if (this.replyCode != REPLY_CODE_OK
- && this.replyCode != REPLY_CODE_OK_WITH_ASYNC_INFO) {
- String err =
- "Unknown handshake reply code: %s nioMessageLength: %s";
- Object[] errArgs = new Object[] {this.replyCode,
- messageLength};
- if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
- logger.debug(
- String.format(err, errArgs) + " (peer probably departed ungracefully)");
- } else {
- logger.fatal(err, errArgs);
- }
- this.readerShuttingDown = true;
- requestClose(String.format(err, errArgs));
- return true;
+ return;
}
- notifyHandshakeWaiter(true);
- return false;
}
private void setThreadName(int dominoNumber) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 6470203..15a6389 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -15,14 +15,18 @@
package org.apache.geode.internal.tcp;
import java.io.IOException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
+import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.Buffers;
import org.apache.geode.internal.net.NioFilter;
@@ -32,6 +36,8 @@ import org.apache.geode.internal.net.NioFilter;
*
*/
public class MsgReader {
+ private static final Logger logger = LogService.getLogger();
+
protected final Connection conn;
protected final Header header = new Header();
private final NioFilter ioFilter;
@@ -44,8 +50,10 @@ public class MsgReader {
this.conn = conn;
this.ioFilter = nioFilter;
this.peerNetData = peerNetData;
- ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
- buffer.position(0).limit(0);
+ if (conn.getConduit().useSSL()) {
+ ByteBuffer buffer = ioFilter.getUnwrappedBuffer(peerNetData);
+ buffer.position(0).limit(0);
+ }
this.byteBufferInputStream =
version == null ? new ByteBufferInputStream() : new VersionedByteBufferInputStream(version);
}
@@ -53,19 +61,36 @@ public class MsgReader {
Header readHeader() throws IOException {
ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
- int nioMessageLength = unwrappedBuffer.getInt();
- /* nioMessageVersion = */ Connection.calcHdrVersion(nioMessageLength);
- nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
- byte nioMessageType = unwrappedBuffer.get();
- short nioMsgId = unwrappedBuffer.getShort();
+ // logger.info("BRUCE: MsgReader.readHeader buffer position {} limit {}",
+ // unwrappedBuffer.position(), unwrappedBuffer.limit());
+ Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES);
+
+ int position = unwrappedBuffer.position();
+ int limit = unwrappedBuffer.limit();
- boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
- if (directAck) {
- nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
+ try {
+ int nioMessageLength = unwrappedBuffer.getInt();
+ /* nioMessageVersion = */
+ Connection.calcHdrVersion(nioMessageLength);
+ nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
+ byte nioMessageType = unwrappedBuffer.get();
+ short nioMsgId = unwrappedBuffer.getShort();
+
+ boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
+ if (directAck) {
+ nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
+ }
+
+ header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+
+ return header;
+ } catch (BufferUnderflowException e) {
+ dumpState("BufferUnderflowException", e, unwrappedBuffer, position, limit);
+ throw e;
+ } finally {
+ dumpState("readHeader", null, unwrappedBuffer, position, limit);
}
- header.setFields(nioMessageLength, nioMessageType, nioMsgId);
- return header;
}
/**
@@ -79,10 +104,19 @@ public class MsgReader {
Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
this.getStats().incMessagesBeingReceived(true, header.messageLength);
long startSer = this.getStats().startMsgDeserialization();
+ int position = nioInputBuffer.position();
+ int limit = nioInputBuffer.limit();
try {
byteBufferInputStream.setBuffer(nioInputBuffer);
ReplyProcessor21.initMessageRPId();
+// dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit);
return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream);
+ } catch (RuntimeException e) {
+ dumpState("readMessage(1)", e, nioInputBuffer, position, limit);
+ throw e;
+ } catch (IOException e) {
+ dumpState("readMessage(2)", e, nioInputBuffer, position, limit);
+ throw e;
} finally {
this.getStats().endMsgDeserialization(startSer);
this.getStats().decMessagesBeingReceived(header.messageLength);
@@ -90,6 +124,20 @@ public class MsgReader {
}
}
+ private void dumpState(String whereFrom, Throwable e, ByteBuffer inputBuffer, int position,
+ int limit) {
+ logger.info("BRUCE: {}, Connection to {}", whereFrom, conn.getRemoteAddress());
+ logger.info("BRUCE: {}, Message length {}; type {}; id {}",
+ whereFrom, header.messageLength, header.messageId, header.messageId);
+ logger.info("BRUCE: {}, starting buffer position {}; buffer limit {} buffer hash {}",
+ whereFrom, position, limit, Integer.toHexString(System.identityHashCode(inputBuffer)));
+ logger.info("BRUCE: {}, current buffer position {}; buffer limit {}",
+ whereFrom, inputBuffer.position(), inputBuffer.limit());
+ if (e != null) {
+ logger.info("BRUCE: Exception reading message", e);
+ }
+ }
+
void readChunk(Header header, MsgDestreamer md)
throws IOException {
ByteBuffer unwrappedBuffer = readAtLeast(header.messageLength);
@@ -102,8 +150,15 @@ public class MsgReader {
private ByteBuffer readAtLeast(int bytes) throws IOException {
+ // logger.info("BRUCE: ensureWrappedCapacity need {} buffer {} position {} limit {} capacity
+ // {}", bytes,
+ // Integer.toHexString(System.identityHashCode(peerNetData)), peerNetData.position(),
+ // peerNetData.limit(), peerNetData.capacity());
peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData,
Buffers.BufferType.TRACKED_RECEIVER, getStats());
+ // logger.info("BRUCE: result buffer {} position {} limit {} capacity {}",
+ // Integer.toHexString(System.identityHashCode(peerNetData)), peerNetData.position(),
+ // peerNetData.limit(), peerNetData.capacity());
return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats());
}