You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/01 21:02:56 UTC
[02/50] [abbrv] geode git commit: GEODE-3286: Failing to cleanup
connections from ConnectionTable receiver table
GEODE-3286: Failing to cleanup connections from ConnectionTable receiver table
- prevent adding a closed connection to the connection table's receivers
- add a new unit test for connection table
- adding connection factory object for creating receiving connections
- have the idle connection timeout ensure connections are removed from connection
table receivers
- modify tcpConduit stat accesses to allow easier mocking
Signed-off-by: Hitesh Khamesra <hi...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/8aed2684
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/8aed2684
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/8aed2684
Branch: refs/heads/feature/GEODE-3299
Commit: 8aed26846de6e9ff1c123acae98a7b5ce6d82a83
Parents: e7515f5
Author: Brian Rowe <br...@pivotal.io>
Authored: Tue Jul 25 15:43:35 2017 -0700
Committer: Hitesh Khamesra <hi...@yahoo.com>
Committed: Tue Jul 25 15:43:35 2017 -0700
----------------------------------------------------------------------
.../apache/geode/internal/tcp/Connection.java | 118 +++++++++++--------
.../geode/internal/tcp/ConnectionTable.java | 52 ++++----
.../apache/geode/internal/tcp/MsgReader.java | 2 +-
.../internal/tcp/PeerConnectionFactory.java | 32 +++++
.../apache/geode/internal/tcp/TCPConduit.java | 30 +++--
.../geode/internal/tcp/ConnectionTableTest.java | 66 +++++++++++
6 files changed, 213 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 1afe6ff..9b1a10a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -510,22 +510,22 @@ public class Connection implements Runnable {
* creates a connection that we accepted (it was initiated by an explicit connect being done on
* the other side). We will only receive data on this socket; never send.
*/
- protected static Connection createReceiver(ConnectionTable t, Socket s)
+ protected static Connection createReceiver(ConnectionTable table, Socket socket)
throws IOException, ConnectionException {
- Connection c = new Connection(t, s);
+ Connection connection = new Connection(table, socket);
boolean readerStarted = false;
try {
- c.startReader(t);
+ connection.startReader(table);
readerStarted = true;
} finally {
if (!readerStarted) {
- c.closeForReconnect(
+ connection.closeForReconnect(
LocalizedStrings.Connection_COULD_NOT_START_READER_THREAD.toLocalizedString());
}
}
- c.waitForHandshake();
- c.finishedConnecting = true;
- return c;
+ connection.waitForHandshake();
+ connection.finishedConnecting = true;
+ return connection;
}
/**
@@ -568,6 +568,12 @@ public class Connection implements Runnable {
}
}
+ protected void initRecevier() {
+ this.startReader(owner);
+ this.waitForHandshake();
+ this.finishedConnecting = true;
+ }
+
void setIdleTimeoutTask(SystemTimerTask task) {
this.idleTask = task;
}
@@ -591,7 +597,7 @@ public class Connection implements Runnable {
this.accessed = false;
if (isIdle) {
this.timedOut = true;
- this.owner.getConduit().stats.incLostLease();
+ this.owner.getConduit().getStats().incLostLease();
if (logger.isDebugEnabled()) {
logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource,
this.preserveOrder);
@@ -1059,7 +1065,7 @@ public class Connection implements Runnable {
LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0,
remoteAddr));
}
- t.getConduit().stats.incReconnectAttempts();
+ t.getConduit().getStats().incReconnectAttempts();
}
// create connection
try {
@@ -1086,7 +1092,7 @@ public class Connection implements Runnable {
} // IOException
finally {
if (conn == null) {
- t.getConduit().stats.incFailedConnect();
+ t.getConduit().getStats().incFailedConnect();
}
}
if (conn != null) {
@@ -1322,6 +1328,14 @@ public class Connection implements Runnable {
this.batchFlusher.start();
}
+ public void onIdleCancel() {
+ // Make sure receivers are removed from the connection table, this should always be a noop, but
+ // is done here as a failsafe.
+ if (isReceiver) {
+ owner.removeReceiver(this);
+ }
+ }
+
private class BatchBufferFlusher extends Thread {
private volatile boolean flushNeeded = false;
private volatile boolean timeToStop = false;
@@ -1330,7 +1344,7 @@ public class Connection implements Runnable {
public BatchBufferFlusher() {
setDaemon(true);
- this.stats = owner.getConduit().stats;
+ this.stats = owner.getConduit().getStats();
}
/**
@@ -1367,7 +1381,7 @@ public class Connection implements Runnable {
} // while
}
} finally {
- owner.getConduit().stats.incBatchWaitTime(start);
+ owner.getConduit().getStats().incBatchWaitTime(start);
}
}
@@ -1456,7 +1470,7 @@ public class Connection implements Runnable {
if (src.remaining() <= dst.remaining()) {
final long copyStart = DistributionStats.getStatTime();
dst.put(src);
- this.owner.getConduit().stats.incBatchCopyTime(copyStart);
+ this.owner.getConduit().getStats().incBatchCopyTime(copyStart);
return;
}
}
@@ -1465,7 +1479,7 @@ public class Connection implements Runnable {
this.batchFlusher.flushBuffer(dst);
} while (true);
} finally {
- this.owner.getConduit().stats.incBatchSendTime(start);
+ this.owner.getConduit().getStats().incBatchSendTime(start);
}
}
@@ -1537,7 +1551,7 @@ public class Connection implements Runnable {
this.connected = false;
closeSenderSem();
{
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
if (this.finishedConnecting) {
if (this.isReceiver) {
stats.decReceivers();
@@ -1684,7 +1698,7 @@ public class Connection implements Runnable {
initiateSuspicionIfSharedUnordered();
if (this.isReceiver) {
if (!this.sharedResource) {
- this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
+ this.conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get());
}
asyncClose(false);
this.owner.removeAndCloseThreadOwnedSockets();
@@ -1692,7 +1706,7 @@ public class Connection implements Runnable {
ByteBuffer tmp = this.nioInputBuffer;
if (tmp != null) {
this.nioInputBuffer = null;
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
Buffers.releaseReceiveBuffer(tmp, stats);
}
// make sure that if the reader thread exits we notify a thread waiting
@@ -1960,7 +1974,7 @@ public class Connection implements Runnable {
if (result != null) {
this.idleMsgDestreamer = null;
} else {
- result = new MsgDestreamer(this.owner.getConduit().stats,
+ result = new MsgDestreamer(this.owner.getConduit().getStats(),
this.conduit.getCancelCriterion(), v);
}
result.setName(p2pReaderName() + " msgId=" + msgId);
@@ -2019,7 +2033,7 @@ public class Connection implements Runnable {
}
}
- byte[] lenbytes = new byte[MSG_HEADER_BYTES];
+ byte[] headerBytes = new byte[MSG_HEADER_BYTES];
final ByteArrayDataInput dis = new ByteArrayDataInput();
while (!stopped) {
@@ -2040,20 +2054,20 @@ public class Connection implements Runnable {
break;
}
int len = 0;
- if (readFully(input, lenbytes, lenbytes.length) < 0) {
+ if (readFully(input, headerBytes, headerBytes.length) < 0) {
stopped = true;
continue;
}
// long recvNanos = DistributionStats.getStatTime();
- len = ((lenbytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000)
- + ((lenbytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000)
- + ((lenbytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100)
- + (lenbytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff);
+ len = ((headerBytes[MSG_HEADER_SIZE_OFFSET] & 0xff) * 0x1000000)
+ + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 1] & 0xff) * 0x10000)
+ + ((headerBytes[MSG_HEADER_SIZE_OFFSET + 2] & 0xff) * 0x100)
+ + (headerBytes[MSG_HEADER_SIZE_OFFSET + 3] & 0xff);
/* byte msgHdrVersion = */ calcHdrVersion(len);
len = calcMsgByteSize(len);
- int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
- short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
- + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
+ int msgType = headerBytes[MSG_HEADER_TYPE_OFFSET];
+ short msgId = (short) (((headerBytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
+ + (headerBytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
if (myDirectAck) {
msgType &= ~DIRECT_ACK_BIT; // clear the bit
@@ -2080,14 +2094,14 @@ public class Connection implements Runnable {
if (msgType == NORMAL_MSG_TYPE) {
// DMStats stats = this.owner.getConduit().stats;
// long start = DistributionStats.getStatTime();
- this.owner.getConduit().stats.incMessagesBeingReceived(true, len);
+ this.owner.getConduit().getStats().incMessagesBeingReceived(true, len);
dis.initialize(bytes, this.remoteVersion);
DistributionMessage msg = null;
try {
ReplyProcessor21.initMessageRPId();
- long startSer = this.owner.getConduit().stats.startMsgDeserialization();
+ long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
msg = (DistributionMessage) InternalDataSerializer.readDSFID(dis);
- this.owner.getConduit().stats.endMsgDeserialization(startSer);
+ this.owner.getConduit().getStats().endMsgDeserialization(startSer);
if (dis.available() != 0) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
@@ -2142,7 +2156,7 @@ public class Connection implements Runnable {
}
} else if (msgType == CHUNKED_MSG_TYPE) {
MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
- this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
+ this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len);
try {
md.addChunk(bytes);
} catch (IOException ex) {
@@ -2151,7 +2165,7 @@ public class Connection implements Runnable {
}
} else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
- this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0, len);
+ this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0, len);
try {
md.addChunk(bytes);
} catch (IOException ex) {
@@ -2166,13 +2180,13 @@ public class Connection implements Runnable {
try {
msg = md.getMessage();
} catch (ClassNotFoundException ex) {
- this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+ this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureEx = ex;
rpId = md.getRPid();
logger.warn(LocalizedMessage
.create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex));
} catch (IOException ex) {
- this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+ this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE
.toLocalizedString();
failureEx = ex;
@@ -2194,7 +2208,7 @@ public class Connection implements Runnable {
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
- this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+ this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE
.toLocalizedString();
failureEx = ex;
@@ -2333,7 +2347,7 @@ public class Connection implements Runnable {
// logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
// will prefer shared sockets");
}
- this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
+ this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
}
if (logger.isDebugEnabled()) {
@@ -2704,7 +2718,7 @@ public class Connection implements Runnable {
private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
throws ConnectionException {
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
long start = DistributionStats.getStatTime();
try {
ConflationKey ck = null;
@@ -2864,7 +2878,7 @@ public class Connection implements Runnable {
private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
ByteBuffer result = null;
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
long start = DistributionStats.getStatTime();
try {
synchronized (this.outgoingQueue) {
@@ -2965,7 +2979,7 @@ public class Connection implements Runnable {
*/
protected void runNioPusher() {
try {
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
final long threadStart = stats.startAsyncThread();
try {
stats.incAsyncQueues(1);
@@ -3279,7 +3293,7 @@ public class Connection implements Runnable {
*/
protected void nioWriteFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
DistributionMessage msg) throws IOException, ConnectionException {
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
if (!this.sharedResource) {
stats.incTOSentMsg();
}
@@ -3318,7 +3332,7 @@ public class Connection implements Runnable {
/** gets the buffer for receiving message length bytes */
protected ByteBuffer getNIOBuffer() {
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
if (nioInputBuffer == null) {
int allocSize = this.recvBufferSize;
if (allocSize == -1) {
@@ -3374,7 +3388,7 @@ public class Connection implements Runnable {
boolean origSocketInUse = this.socketInUse;
this.socketInUse = true;
MsgReader msgReader = null;
- DMStats stats = owner.getConduit().stats;
+ DMStats stats = owner.getConduit().getStats();
final Version version = getRemoteVersion();
try {
if (useNIO()) {
@@ -3518,7 +3532,7 @@ public class Connection implements Runnable {
nioInputBuffer.limit(startPos + nioMessageLength);
if (this.handshakeRead) {
if (nioMessageType == NORMAL_MSG_TYPE) {
- this.owner.getConduit().stats.incMessagesBeingReceived(true, nioMessageLength);
+ this.owner.getConduit().getStats().incMessagesBeingReceived(true, nioMessageLength);
ByteBufferInputStream bbis =
remoteVersion == null ? new ByteBufferInputStream(nioInputBuffer)
: new VersionedByteBufferInputStream(nioInputBuffer, remoteVersion);
@@ -3526,9 +3540,9 @@ public class Connection implements Runnable {
try {
ReplyProcessor21.initMessageRPId();
// add serialization stats
- long startSer = this.owner.getConduit().stats.startMsgDeserialization();
+ long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
- this.owner.getConduit().stats.endMsgDeserialization(startSer);
+ this.owner.getConduit().getStats().endMsgDeserialization(startSer);
if (bbis.available() != 0) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.Connection_MESSAGE_DESERIALIZATION_OF_0_DID_NOT_READ_1_BYTES,
@@ -3593,7 +3607,7 @@ public class Connection implements Runnable {
}
} else if (nioMessageType == CHUNKED_MSG_TYPE) {
MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
- this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0,
+ this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
nioMessageLength);
try {
md.addChunk(nioInputBuffer, nioMessageLength);
@@ -3604,7 +3618,7 @@ public class Connection implements Runnable {
} else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
// logger.info("END_CHUNK msgId="+nioMsgId);
MsgDestreamer md = obtainMsgDestreamer(nioMsgId, remoteVersion);
- this.owner.getConduit().stats.incMessagesBeingReceived(md.size() == 0,
+ this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
nioMessageLength);
try {
md.addChunk(nioInputBuffer, nioMessageLength);
@@ -3621,7 +3635,7 @@ public class Connection implements Runnable {
try {
msg = md.getMessage();
} catch (ClassNotFoundException ex) {
- this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+ this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE
.toLocalizedString();
failureEx = ex;
@@ -3629,7 +3643,7 @@ public class Connection implements Runnable {
logger.fatal(LocalizedMessage
.create(LocalizedStrings.Connection_CLASSNOTFOUND_DESERIALIZING_MESSAGE_0, ex));
} catch (IOException ex) {
- this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+ this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = LocalizedStrings.Connection_IOEXCEPTION_DESERIALIZING_MESSAGE
.toLocalizedString();
failureEx = ex;
@@ -3652,7 +3666,7 @@ public class Connection implements Runnable {
// is still usable:
SystemFailure.checkFailure();
this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
- this.owner.getConduit().stats.decMessagesBeingReceived(md.size());
+ this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = LocalizedStrings.Connection_UNEXPECTED_FAILURE_DESERIALIZING_MESSAGE
.toLocalizedString();
failureEx = ex;
@@ -3822,7 +3836,7 @@ public class Connection implements Runnable {
// } else {
// ConnectionTable.threadWantsSharedResources();
}
- this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
+ this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
// Because this thread is not shared resource, it will be used for direct
// ack. Direct ack messages can be large. This call will resize the send
// buffer.
@@ -3915,7 +3929,7 @@ public class Connection implements Runnable {
private void compactOrResizeBuffer(int messageLength) {
final int oldBufferSize = nioInputBuffer.capacity();
- final DMStats stats = this.owner.getConduit().stats;
+ final DMStats stats = this.owner.getConduit().getStats();
int allocSize = messageLength + MSG_HEADER_BYTES;
if (oldBufferSize < allocSize) {
// need a bigger buffer
http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index c55af82..affe5cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -114,6 +114,7 @@ public class ConnectionTable {
* acks, will be put in this map.
*/
protected final Map unorderedConnectionMap = new ConcurrentHashMap();
+
/**
* Used for all accepted connections. These connections are read only; we never send messages,
* except for acks; only receive.
@@ -201,14 +202,14 @@ public class ConnectionTable {
}
- private ConnectionTable(TCPConduit c) throws IOException {
- this.owner = c;
+ private ConnectionTable(TCPConduit conduit) throws IOException {
+ this.owner = conduit;
this.idleConnTimer = (this.owner.idleConnectionTimeout != 0)
- ? new SystemTimer(c.getDM().getSystem(), true) : null;
+ ? new SystemTimer(conduit.getDM().getSystem(), true) : null;
this.threadOrderedConnMap = new ThreadLocal();
this.threadConnMaps = new ArrayList();
this.threadConnectionMap = new ConcurrentHashMap();
- this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
+ this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
this.socketCloser = new SocketCloser();
}
@@ -248,14 +249,14 @@ public class ConnectionTable {
// }
/** conduit calls acceptConnection after an accept */
- protected void acceptConnection(Socket sock) throws IOException, ConnectionException {
- Connection connection = null;
+ protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
+ throws IOException, ConnectionException, InterruptedException {
InetAddress connAddress = sock.getInetAddress(); // for bug 44736
boolean finishedConnecting = false;
- Connection conn = null;
+ Connection connection = null;
// boolean exceptionLogged = false;
try {
- conn = Connection.createReceiver(this, sock);
+ connection = peerConnectionFactory.createReceiver(this, sock);
// check for shutdown (so it doesn't get missed in the finally block)
this.owner.getCancelCriterion().checkCancelInProgress(null);
@@ -279,26 +280,29 @@ public class ConnectionTable {
// in our caller.
// no need to log error here since caller will log warning
- if (conn != null && !finishedConnecting) {
+ if (connection != null && !finishedConnecting) {
// we must be throwing from checkCancelInProgress so close the connection
- closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(), conn);
- conn = null;
+ closeCon(LocalizedStrings.ConnectionTable_CANCEL_AFTER_ACCEPT.toLocalizedString(),
+ connection);
+ connection = null;
}
}
- if (conn != null) {
+ if (connection != null) {
synchronized (this.receivers) {
- this.owner.stats.incReceivers();
+ this.owner.getStats().incReceivers();
if (this.closed) {
closeCon(LocalizedStrings.ConnectionTable_CONNECTION_TABLE_NO_LONGER_IN_USE
- .toLocalizedString(), conn);
+ .toLocalizedString(), connection);
return;
}
- this.receivers.add(conn);
+ if (!connection.isSocketClosed()) {
+ this.receivers.add(connection);
+ }
}
if (logger.isDebugEnabled()) {
- logger.debug("Accepted {} myAddr={} theirAddr={}", conn, getConduit().getMemberId(),
- conn.remoteAddr);
+ logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(),
+ connection.remoteAddr);
}
}
}
@@ -328,11 +332,11 @@ public class ConnectionTable {
try {
con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, id,
sharedResource, startTime, ackThreshold, ackSAThreshold);
- this.owner.stats.incSenders(sharedResource, preserveOrder);
+ this.owner.getStats().incSenders(sharedResource, preserveOrder);
} finally {
// our connection failed to notify anyone waiting for our pending con
if (con == null) {
- this.owner.stats.incFailedConnect();
+ this.owner.getStats().incFailedConnect();
synchronized (m) {
Object rmObj = m.remove(id);
if (rmObj != pc && rmObj != null) {
@@ -521,7 +525,7 @@ public class ConnectionTable {
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
}
- this.owner.stats.incSenders(false/* shared */, true /* preserveOrder */);
+ this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder */);
// Update the list of connections owned by this thread....
@@ -1309,6 +1313,10 @@ public class ConnectionTable {
@Override
public boolean cancel() {
+ Connection con = this.c;
+ if (con != null) {
+ con.onIdleCancel();
+ }
this.c = null;
return super.cancel();
}
@@ -1355,5 +1363,7 @@ public class ConnectionTable {
}
}
-
+ public int getNumberOfReceivers() {
+ return receivers.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index be1f533..1bbfd08 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -94,7 +94,7 @@ public abstract class MsgReader {
public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
protected DMStats getStats() {
- return conn.getConduit().stats;
+ return conn.getConduit().getStats();
}
public static class Header {
http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
new file mode 100644
index 0000000..7bf9638
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/PeerConnectionFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+public class PeerConnectionFactory {
+ /**
+ * creates a connection that we accepted (it was initiated by an explicit connect being done on
+ * the other side). We will only receive data on this socket; never send.
+ */
+ public Connection createReceiver(ConnectionTable table, Socket socket)
+ throws IOException, ConnectionException {
+ Connection connection = new Connection(table, socket);
+ connection.initRecevier();
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 991aaf7..c52a676 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -20,7 +20,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
@@ -189,10 +188,8 @@ public class TCPConduit implements Runnable {
* the object that receives DistributionMessage messages received by this conduit.
*/
private final DirectChannel directChannel;
- /**
- * Stats from the delegate
- */
- DMStats stats;
+
+ private DMStats stats;
/**
* Config from the delegate
@@ -260,7 +257,7 @@ public class TCPConduit implements Runnable {
this.stats = directChannel.getDMStats();
this.config = directChannel.getDMConfig();
}
- if (this.stats == null) {
+ if (this.getStats() == null) {
this.stats = new LonerDistributionManager.DummyDMStats();
}
@@ -642,7 +639,7 @@ public class TCPConduit implements Runnable {
if (directChannel != null) {
this.stats = directChannel.getDMStats();
}
- if (this.stats == null) {
+ if (this.getStats() == null) {
this.stats = new LonerDistributionManager.DummyDMStats();
}
try {
@@ -745,7 +742,7 @@ public class TCPConduit implements Runnable {
}
}
} else {
- this.stats.incFailedAccept();
+ this.getStats().incFailedAccept();
if (e instanceof IOException && "Too many open files".equals(e.getMessage())) {
getConTable().fileDescriptorsExhausted();
} else {
@@ -800,16 +797,16 @@ public class TCPConduit implements Runnable {
protected void basicAcceptConnection(Socket othersock) {
try {
- getConTable().acceptConnection(othersock);
+ getConTable().acceptConnection(othersock, new PeerConnectionFactory());
} catch (IOException io) {
// exception is logged by the Connection
if (!stopped) {
- this.stats.incFailedAccept();
+ this.getStats().incFailedAccept();
}
} catch (ConnectionException ex) {
// exception is logged by the Connection
if (!stopped) {
- this.stats.incFailedAccept();
+ this.getStats().incFailedAccept();
}
} catch (CancelException e) {
} catch (Exception e) {
@@ -820,7 +817,7 @@ public class TCPConduit implements Runnable {
// }
// else
{
- this.stats.incFailedAccept();
+ this.getStats().incFailedAccept();
logger.warn(LocalizedMessage.create(
LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
new Object[] {othersock.getInetAddress(), e}), e);
@@ -977,7 +974,7 @@ public class TCPConduit implements Runnable {
}
// Close the connection (it will get rebuilt later).
- this.stats.incReconnectAttempts();
+ this.getStats().incReconnectAttempts();
if (conn != null) {
try {
if (logger.isDebugEnabled()) {
@@ -1172,6 +1169,13 @@ public class TCPConduit implements Runnable {
return (ct != null) && ct.hasReceiversFor(endPoint);
}
+ /**
+ * Stats from the delegate
+ */
+ public DMStats getStats() {
+ return stats;
+ }
+
protected class Stopper extends CancelCriterion {
/*
http://git-wip-us.apache.org/repos/asf/geode/blob/8aed2684/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
new file mode 100644
index 0000000..312c64d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.Socket;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@Category(UnitTest.class)
+public class ConnectionTableTest {
+
+ @Test
+ public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws Exception {
+ InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+ when(system.isShareSockets()).thenReturn(false);
+
+ DM dm = mock(DM.class);
+ when(dm.getSystem()).thenReturn(system);
+
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ DMStats dmStats = mock(DMStats.class);
+
+ TCPConduit tcpConduit = mock(TCPConduit.class);
+ when(tcpConduit.getDM()).thenReturn(dm);
+ when(tcpConduit.getCancelCriterion()).thenReturn(cancelCriterion);
+ when(tcpConduit.getStats()).thenReturn(dmStats);
+
+ Connection connection = mock(Connection.class);
+ when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed as soon at it was
+ // created
+
+ Socket socket = mock(Socket.class);
+
+ ConnectionTable table = ConnectionTable.create(tcpConduit);
+
+ PeerConnectionFactory factory = mock(PeerConnectionFactory.class);
+ when(factory.createReceiver(table, socket)).thenReturn(connection);
+
+ table.acceptConnection(socket, factory);
+ assertEquals(0, table.getNumberOfReceivers());
+ }
+}