You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2019/12/07 00:21:13 UTC
[geode] 01/02: GEODE-7256: Cleanup Connection classes and tests
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch GEODE-7256-AlterRuntimeCommandDUnitTest
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 818d4ffb57bf742b08016f039516fc8076f53af4
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Dec 6 12:52:34 2019 -0800
GEODE-7256: Cleanup Connection classes and tests
* Minor cleanup of classes and tests
---
.../internal/tcp/ConnectionIntegrationTest.java | 61 +-
.../org/apache/geode/internal/tcp/Connection.java | 2060 +++++++++-----------
.../apache/geode/internal/tcp/ConnectionTable.java | 685 +++----
.../org/apache/geode/internal/tcp/TCPConduit.java | 375 ++--
.../geode/internal/tcp/ConnectionJUnitTest.java | 108 -
.../geode/internal/tcp/ConnectionTableTest.java | 22 +-
.../apache/geode/internal/tcp/ConnectionTest.java | 74 +-
7 files changed, 1545 insertions(+), 1840 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java
index 1a0db82..3413eec 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/tcp/ConnectionIntegrationTest.java
@@ -14,6 +14,10 @@
*/
package org.apache.geode.internal.tcp;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
+import static org.apache.geode.test.assertj.LogFileAssert.assertThat;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.quality.Strictness.STRICT_STUBS;
@@ -26,6 +30,8 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Properties;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -34,20 +40,23 @@ import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.apache.geode.cache.Cache;
-import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.assertj.LogFileAssert;
-import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.rules.CacheRule;
import org.apache.geode.test.junit.categories.MembershipTest;
-@Category({MembershipTest.class})
+@Category(MembershipTest.class)
public class ConnectionIntegrationTest {
+ private static final String EXPECTED_EXCEPTION_MESSAGE =
+ "Unknown handshake reply code: 99 messageLength: 0";
+
+ private File logFile;
+ private Cache cache;
+
@Rule
- public final MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+ public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -55,36 +64,44 @@ public class ConnectionIntegrationTest {
@Rule
public CacheRule cacheRule = new CacheRule();
- @Test
- public void badHeaderMessageIsCorrectlyLogged() throws Exception {
+ @Before
+ public void setUp() throws Exception {
+ logFile = temporaryFolder.newFile();
+
Properties properties = new Properties();
- properties.put(ConfigurationProperties.LOCATORS, ""); // loner system
- File logFile = temporaryFolder.newFile();
- properties.put(ConfigurationProperties.LOG_FILE, logFile.getAbsolutePath());
- cacheRule.createCache(properties);
- Cache cache = cacheRule.getCache();
+ properties.setProperty(LOCATORS, "");
+ properties.setProperty(LOG_FILE, logFile.getAbsolutePath());
- final String expectedException = "Unknown handshake reply code: 99 messageLength: 0";
+ cache = cacheRule.getOrCreateCache(properties);
- IgnoredException.addIgnoredException(expectedException);
+ addIgnoredException(EXPECTED_EXCEPTION_MESSAGE);
+ }
+ @After
+ public void tearDown() {
+ cache.close();
+ }
+
+ @Test
+ public void badHeaderMessageIsCorrectlyLogged() {
ConnectionTable connectionTable = mock(ConnectionTable.class);
+ DistributionConfig config = mock(DistributionConfig.class);
+ Socket socket = mock(Socket.class);
TCPConduit tcpConduit = mock(TCPConduit.class);
+
when(connectionTable.getConduit()).thenReturn(tcpConduit);
when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress("localhost", 1234));
- DistributionConfig config = mock(DistributionConfig.class);
when(config.getEnableNetworkPartitionDetection()).thenReturn(false);
when(tcpConduit.getConfig()).thenReturn(config);
when(tcpConduit.getMemberId()).thenReturn(new InternalDistributedMember("localhost", 2345));
when(connectionTable.getSocketCloser()).thenReturn(mock(SocketCloser.class));
- Socket socket = mock(Socket.class);
+
Connection connection = new Connection(connectionTable, socket);
- ByteBuffer peerDataBuffer = ByteBuffer.allocate(100);
- byte[] bytes = new byte[] {99};
- ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(new byte[] {99});
DataInputStream inputStream = new DataInputStream(byteArrayInputStream);
- connection.readHandshakeForSender(inputStream, peerDataBuffer);
- cache.close();
- LogFileAssert.assertThat(logFile).contains(expectedException);
+
+ connection.readHandshakeForSender(inputStream, ByteBuffer.allocate(100));
+
+ assertThat(logFile).contains(EXPECTED_EXCEPTION_MESSAGE);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 5c95d0d..d813b69 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -14,8 +14,13 @@
*/
package org.apache.geode.internal.tcp;
+import static java.lang.Boolean.FALSE;
+import static java.lang.ThreadLocal.withInitial;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
+import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.ConnectException;
@@ -30,7 +35,6 @@ import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -40,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
import org.apache.logging.log4j.Logger;
@@ -47,6 +52,7 @@ import org.apache.geode.CancelException;
import org.apache.geode.SerializationException;
import org.apache.geode.SystemFailure;
import org.apache.geode.alerting.internal.spi.AlertingAction;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheClosedException;
@@ -57,7 +63,6 @@ import org.apache.geode.distributed.internal.ConflationKey;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.DistributionStats;
@@ -69,7 +74,6 @@ import org.apache.geode.distributed.internal.ReplySender;
import org.apache.geode.distributed.internal.direct.DirectChannel;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.gms.api.Membership;
-import org.apache.geode.distributed.internal.membership.gms.api.MembershipStatistics;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.InternalDataSerializer;
@@ -93,12 +97,13 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
*/
public class Connection implements Runnable {
private static final Logger logger = LogService.getLogger();
+
public static final String THREAD_KIND_IDENTIFIER = "P2P message reader";
@MakeNotStatic
private static int P2P_CONNECT_TIMEOUT;
@MakeNotStatic
- private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED = false;
+ private static boolean IS_P2P_CONNECT_TIMEOUT_INITIALIZED;
static final int NORMAL_MSG_TYPE = 0x4c;
static final int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
@@ -114,18 +119,25 @@ public class Connection implements Runnable {
* Small buffer used for send socket buffer on receiver connections and receive buffer on sender
* connections.
*/
- public static final int SMALL_BUFFER_SIZE =
- Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
+ static final int SMALL_BUFFER_SIZE =
+ Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096);
- /** counter to give connections a unique id */
+ /**
+ * counter to give connections a unique id
+ */
@MakeNotStatic
- private static final AtomicLong idCounter = new AtomicLong(1);
+ private static final AtomicLong ID_COUNTER = new AtomicLong(1);
- /** string used as the reason for initiating suspect processing */
+ /**
+ * string used as the reason for initiating suspect processing
+ */
+ @VisibleForTesting
public static final String INITIATING_SUSPECT_PROCESSING =
"member unexpectedly shut down shared, unordered connection";
- /** the table holding this connection */
+ /**
+ * the table holding this connection
+ */
private final ConnectionTable owner;
private final TCPConduit conduit;
@@ -135,54 +147,19 @@ public class Connection implements Runnable {
* Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
* thread may be a pooled thread.
*/
- private volatile boolean isRunning = false;
+ private volatile boolean isRunning;
- /** true if connection is a shared resource that can be used by more than one thread */
+ /**
+ * true if connection is a shared resource that can be used by more than one thread
+ */
private boolean sharedResource;
- public boolean isSharedResource() {
- return this.sharedResource;
- }
-
- /** The idle timeout timer task for this connection */
+ /**
+ * The idle timeout timer task for this connection
+ */
private SystemTimerTask idleTask;
- private static final ThreadLocal<Boolean> isReaderThread = new ThreadLocal<Boolean>() {
- @Override
- public Boolean initialValue() {
- return Boolean.FALSE;
- }
- };
-
- public static void makeReaderThread() {
- // mark this thread as a reader thread
- makeReaderThread(true);
- }
-
- private static void makeReaderThread(boolean v) {
- isReaderThread.set(v);
- }
-
- // return true if this thread is a reader thread
- private static boolean isReaderThread() {
- return isReaderThread.get();
- }
-
- int getP2PConnectTimeout(DistributionConfig config) {
- if (AlertingAction.isThreadAlerting()) {
- return config.getMemberTimeout();
- }
- if (IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
- return P2P_CONNECT_TIMEOUT;
- String connectTimeoutStr = System.getProperty("p2p.connectTimeout");
- if (connectTimeoutStr != null) {
- P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
- } else {
- P2P_CONNECT_TIMEOUT = 6 * config.getMemberTimeout();
- }
- IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
- return P2P_CONNECT_TIMEOUT;
- }
+ private static final ThreadLocal<Boolean> isReaderThread = withInitial(() -> FALSE);
/**
* If true then readers for thread owned sockets will send all messages on thread owned senders.
@@ -191,40 +168,24 @@ public class Connection implements Runnable {
private static final boolean DOMINO_THREAD_OWNED_SOCKETS =
Boolean.getBoolean("p2p.ENABLE_DOMINO_THREAD_OWNED_SOCKETS");
- private static final ThreadLocal<Boolean> isDominoThread = new ThreadLocal<Boolean>() {
- @Override
- public Boolean initialValue() {
- return Boolean.FALSE;
- }
- };
+ private static final ThreadLocal<Boolean> isDominoThread = withInitial(() -> FALSE);
- // return true if this thread is a reader thread
- private static boolean tipDomino() {
- if (DOMINO_THREAD_OWNED_SOCKETS) {
- // mark this thread as one who wants to send ALL on TO sockets
- ConnectionTable.threadWantsOwnResources();
- isDominoThread.set(Boolean.TRUE);
- return true;
- } else {
- return false;
- }
- }
-
- public static boolean isDominoThread() {
- return isDominoThread.get();
- }
-
- /** the socket entrusted to this connection */
+ /**
+ * the socket entrusted to this connection
+ */
private final Socket socket;
- /** output stream/channel lock */
+ /**
+ * output stream/channel lock
+ */
private final Object outLock = new Object();
- /** the ID string of the conduit (for logging) */
- private String conduitIdStr;
+ /**
+ * the ID string of the conduit (for logging)
+ */
+ private final String conduitIdStr;
- /** Identifies the java group member on the other side of the connection. */
- InternalDistributedMember remoteAddr;
+ private InternalDistributedMember remoteAddr;
/**
* Identifies the version of the member on the other side of the connection.
@@ -242,19 +203,14 @@ public class Connection implements Runnable {
* instance, server-connection -> owned p2p reader (count 0) -> owned p2p reader (count 1) ->
* owned p2p reader (count 2). This shows up in thread names as "DOM #x" (domino #x)
*/
- private static final ThreadLocal<Integer> dominoCount = new ThreadLocal<Integer>() {
- @Override
- protected Integer initialValue() {
- return 0;
- }
- };
+ private static final ThreadLocal<Integer> dominoCount = withInitial(() -> 0);
/**
* How long to wait if receiver will not accept a message before we go into queue mode.
*
* @since GemFire 4.2.2
*/
- private int asyncDistributionTimeout = 0;
+ private int asyncDistributionTimeout;
/**
* How long to wait, with the receiver not accepting any messages, before kicking the receiver out
@@ -262,7 +218,7 @@ public class Connection implements Runnable {
*
* @since GemFire 4.2.2
*/
- private int asyncQueueTimeout = 0;
+ private int asyncQueueTimeout;
/**
* How much queued data we can have, with the receiver not accepting any messages, before kicking
@@ -271,12 +227,12 @@ public class Connection implements Runnable {
*
* @since GemFire 4.2.2
*/
- private long asyncMaxQueueSize = 0;
+ private long asyncMaxQueueSize;
/**
* True if an async queue is already being filled.
*/
- private volatile boolean asyncQueuingInProgress = false;
+ private volatile boolean asyncQueuingInProgress;
/**
* Maps ConflatedKey instances to ConflatedKey instance. Note that even though the key and value
@@ -284,9 +240,11 @@ public class Connection implements Runnable {
*/
private final Map conflatedKeys = new HashMap();
- // NOTE: LinkedBlockingQueue has a bug in which removes from the queue
- // cause future offer to increase the size without adding anything to the queue.
- // So I've changed from this backport class to a java.util.LinkedList
+ /**
+ * NOTE: LinkedBlockingQueue has a bug in which removes from the queue
+ * cause future offer to increase the size without adding anything to the queue.
+ * So I've changed from this backport class to a java.util.LinkedList
+ */
private final LinkedList outgoingQueue = new LinkedList();
/**
@@ -313,8 +271,6 @@ public class Connection implements Runnable {
private volatile boolean handshakeRead;
private volatile boolean handshakeCancelled;
- private volatile int replyCode;
-
private static final byte REPLY_CODE_OK = (byte) 69;
private static final byte REPLY_CODE_OK_WITH_ASYNC_INFO = (byte) 70;
@@ -329,19 +285,19 @@ public class Connection implements Runnable {
/** set to true once a close begins */
private final AtomicBoolean closing = new AtomicBoolean(false);
- private volatile boolean readerShuttingDown = false;
+ private volatile boolean readerShuttingDown;
/** whether the socket is connected */
- volatile boolean connected = false;
+ volatile boolean connected;
/**
* Set to true once a connection finishes its constructor
*/
- private volatile boolean finishedConnecting = false;
+ private volatile boolean finishedConnecting;
private volatile boolean accessed = true;
- private volatile boolean socketInUse = false;
- volatile boolean timedOut = false;
+ private volatile boolean socketInUse;
+ volatile boolean timedOut;
/**
* task for detecting ack timeouts and issuing alerts
@@ -385,7 +341,7 @@ public class Connection implements Runnable {
private short messageId;
/** whether the length of the next message has been established */
- private boolean lengthSet = false;
+ private boolean lengthSet;
/** used to lock access to destreamer data */
private final Object destreamerLock = new Object();
@@ -402,7 +358,7 @@ public class Connection implements Runnable {
/** is this connection used for serial message delivery? */
- boolean preserveOrder = false;
+ boolean preserveOrder;
/** number of messages sent on this connection */
private long messagesSent;
@@ -416,12 +372,201 @@ public class Connection implements Runnable {
private int sendBufferSize = -1;
private int recvBufferSize = -1;
+ @MakeNotStatic
+ private static final ByteBuffer okHandshakeBuf;
+ static {
+ int msglen = 1; // one byte for reply code
+ byte[] bytes = new byte[MSG_HEADER_BYTES + msglen];
+ msglen = calcHdrSize(msglen);
+ bytes[MSG_HEADER_SIZE_OFFSET] = (byte) (msglen / 0x1000000 & 0xff);
+ bytes[MSG_HEADER_SIZE_OFFSET + 1] = (byte) (msglen / 0x10000 & 0xff);
+ bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) (msglen / 0x100 & 0xff);
+ bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
+ bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
+ bytes[MSG_HEADER_ID_OFFSET] = (byte) (MsgIdGenerator.NO_MSG_ID >> 8 & 0xff);
+ bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
+ bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
+ int allocSize = bytes.length;
+ ByteBuffer bb;
+ if (BufferPool.useDirectBuffers) {
+ bb = ByteBuffer.allocateDirect(allocSize);
+ } else {
+ bb = ByteBuffer.allocate(allocSize);
+ }
+ bb.put(bytes);
+ okHandshakeBuf = bb;
+ }
+
+ /**
+ * maximum message buffer size
+ */
+ public static final int MAX_MSG_SIZE = 0x00ffffff;
+
+ private static final int HANDSHAKE_TIMEOUT_MS =
+ Integer.getInteger("p2p.handshakeTimeoutMs", 59000);
+ // private static final byte HANDSHAKE_VERSION = 1; // 501
+ // public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
+ // public static final byte HANDSHAKE_VERSION = 3; // durable_client
+ // public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19
+ // public static final byte HANDSHAKE_VERSION = 5; // split-brain bits
+ // public static final byte HANDSHAKE_VERSION = 6; // direct ack changes
+ // NOTICE: handshake_version should not be changed anymore. Use the gemfire version transmitted
+ // with the handshake bits and handle old handshakes based on that
+ private static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
+
+ private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
+
+ private static final int CONNECT_HANDSHAKE_SIZE = 4096;
+
+ /** time between connection attempts */
+ private static final int RECONNECT_WAIT_TIME =
+ Integer.getInteger(GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000);
+
+ /**
+ * Batch sends currently should not be turned on because: 1. They will be used for all sends
+ * (instead of just no-ack) and thus will break messages that wait for a response (or kill perf).
+ * 2. The buffer is not properly flushed and closed on shutdown. The code attempts to do this but
+ * must not be doing it correctly.
+ */
+ private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
+ private static final int BATCH_BUFFER_SIZE =
+ Integer.getInteger("p2p.batchBufferSize", 1024 * 1024);
+ private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50);
+ private final Object batchLock = new Object();
+ private ByteBuffer fillBatchBuffer;
+ private ByteBuffer sendBatchBuffer;
+ private BatchBufferFlusher batchFlusher;
+
+ /**
+ * use to test message prep overhead (no socket write). WARNING: turning this on completely
+ * disables distribution of batched sends
+ */
+ private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite");
+
+ private final Object pusherSync = new Object();
+
+ private boolean disconnectRequested;
+
+ /**
+ * If true then act as if the socket buffer is full and start async queuing
+ */
+ @MutableForTesting
+ public static volatile boolean FORCE_ASYNC_QUEUE;
+
+ private static final int MAX_WAIT_TIME = 32; // ms (must be a power of 2)
+
+ /**
+ * stateLock is used to synchronize state changes.
+ */
+ private final Object stateLock = new Object();
+
+ /**
+ * for timeout processing, this is the current state of the connection
+ */
+ private byte connectionState = STATE_IDLE;
+
+ /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
+ /** the connection is idle, but may be in use */
+ private static final byte STATE_IDLE = 0;
+ /** the connection is in use and is transmitting data */
+ private static final byte STATE_SENDING = 1;
+ /** the connection is in use and is done transmitting */
+ private static final byte STATE_POST_SENDING = 2;
+ /** the connection is in use and is reading a direct-ack */
+ private static final byte STATE_READING_ACK = 3;
+ /** the connection is in use and has finished reading a direct-ack */
+ private static final byte STATE_RECEIVED_ACK = 4;
+ /** the connection is in use and is reading a message */
+ private static final byte STATE_READING = 5;
+ /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
+
+ /** set to true if we exceeded the ack-wait-threshold waiting for a response */
+ private volatile boolean ackTimedOut;
+
+ /**
+ * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
+ * done on the other side).
+ */
+ protected Connection(ConnectionTable connectionTable, Socket socket) throws ConnectionException {
+ if (connectionTable == null) {
+ throw new IllegalArgumentException("Null ConnectionTable");
+ }
+ conduit = connectionTable.getConduit();
+ isReceiver = true;
+ owner = connectionTable;
+ this.socket = socket;
+ InetSocketAddress conduitSocketId = conduit.getSocketId();
+ conduitIdStr = conduitSocketId.toString();
+ handshakeRead = false;
+ handshakeCancelled = false;
+ connected = true;
+
+ try {
+ socket.setTcpNoDelay(true);
+ socket.setKeepAlive(true);
+ setSendBufferSize(socket, SMALL_BUFFER_SIZE);
+ setReceiveBufferSize(socket);
+ } catch (SocketException e) {
+ // unable to get the settings we want. Don't log an error because it will likely happen a lot
+ }
+ }
+
+ public boolean isSharedResource() {
+ return sharedResource;
+ }
+
+ public static void makeReaderThread() {
+ // mark this thread as a reader thread
+ makeReaderThread(true);
+ }
+
+ private static void makeReaderThread(boolean v) {
+ isReaderThread.set(v);
+ }
+
+ // return true if this thread is a reader thread
+ private static boolean isReaderThread() {
+ return isReaderThread.get();
+ }
+
+ @VisibleForTesting
+ int getP2PConnectTimeout(DistributionConfig config) {
+ if (AlertingAction.isThreadAlerting()) {
+ return config.getMemberTimeout();
+ }
+ if (IS_P2P_CONNECT_TIMEOUT_INITIALIZED)
+ return P2P_CONNECT_TIMEOUT;
+ String connectTimeoutStr = System.getProperty("p2p.connectTimeout");
+ if (connectTimeoutStr != null) {
+ P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
+ } else {
+ P2P_CONNECT_TIMEOUT = 6 * config.getMemberTimeout();
+ }
+ IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
+ return P2P_CONNECT_TIMEOUT;
+ }
+
+ // return true if this thread is a reader thread
+ private static boolean tipDomino() {
+ if (DOMINO_THREAD_OWNED_SOCKETS) {
+ // mark this thread as one who wants to send ALL on TO sockets
+ ConnectionTable.threadWantsOwnResources();
+ isDominoThread.set(Boolean.TRUE);
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean isDominoThread() {
+ return isDominoThread.get();
+ }
+
private void setSendBufferSize(Socket sock) {
- setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
+ setSendBufferSize(sock, owner.getConduit().tcpBufferSize);
}
private void setReceiveBufferSize(Socket sock) {
- setReceiveBufferSize(sock, this.owner.getConduit().tcpBufferSize);
+ setReceiveBufferSize(sock, owner.getConduit().tcpBufferSize);
}
private void setSendBufferSize(Socket sock, int requestedSize) {
@@ -438,7 +583,7 @@ public class Connection implements Runnable {
int currentSize = send ? sock.getSendBufferSize() : sock.getReceiveBufferSize();
if (currentSize == requestedSize) {
if (send) {
- this.sendBufferSize = currentSize;
+ sendBufferSize = currentSize;
}
return;
}
@@ -452,32 +597,32 @@ public class Connection implements Runnable {
try {
int actualSize = send ? sock.getSendBufferSize() : sock.getReceiveBufferSize();
if (send) {
- this.sendBufferSize = actualSize;
+ sendBufferSize = actualSize;
} else {
- this.recvBufferSize = actualSize;
+ recvBufferSize = actualSize;
}
if (actualSize < requestedSize) {
logger.info("Socket {} is {} instead of the requested {}.",
- (send ? "send buffer size" : "receive buffer size"),
+ send ? "send buffer size" : "receive buffer size",
actualSize, requestedSize);
} else if (actualSize > requestedSize) {
if (logger.isTraceEnabled()) {
logger.trace("Socket {} buffer size is {} instead of the requested {}",
- (send ? "send" : "receive"), actualSize, requestedSize);
+ send ? "send" : "receive", actualSize, requestedSize);
}
// Remember the request size which is smaller.
// This remembered value is used for allocating direct mem buffers.
if (send) {
- this.sendBufferSize = requestedSize;
+ sendBufferSize = requestedSize;
} else {
- this.recvBufferSize = requestedSize;
+ recvBufferSize = requestedSize;
}
}
} catch (SocketException ignore) {
if (send) {
- this.sendBufferSize = requestedSize;
+ sendBufferSize = requestedSize;
} else {
- this.recvBufferSize = requestedSize;
+ recvBufferSize = requestedSize;
}
}
}
@@ -487,7 +632,7 @@ public class Connection implements Runnable {
* Returns the size of the send buffer on this connection's socket.
*/
int getSendBufferSize() {
- int result = this.sendBufferSize;
+ int result = sendBufferSize;
if (result != -1) {
return result;
}
@@ -495,52 +640,20 @@ public class Connection implements Runnable {
result = getSocket().getSendBufferSize();
} catch (SocketException ignore) {
// just return a default
- result = this.owner.getConduit().tcpBufferSize;
+ result = owner.getConduit().tcpBufferSize;
}
- this.sendBufferSize = result;
+ sendBufferSize = result;
return result;
}
- /**
- * creates a "reader" connection that we accepted (it was initiated by an explicit connect being
- * done on
- * the other side).
- */
- protected Connection(ConnectionTable t, Socket socket)
- throws ConnectionException {
- if (t == null) {
- throw new IllegalArgumentException(
- "Null ConnectionTable");
- }
- this.conduit = t.getConduit();
- this.isReceiver = true;
- this.owner = t;
- this.socket = socket;
- this.conduitIdStr = conduit.getSocketId().toString();
- this.handshakeRead = false;
- this.handshakeCancelled = false;
- this.connected = true;
-
- try {
- socket.setTcpNoDelay(true);
- socket.setKeepAlive(true);
- setSendBufferSize(socket, SMALL_BUFFER_SIZE);
- setReceiveBufferSize(socket);
- } catch (SocketException e) {
- // unable to get the settings we want. Don't log an error because it will
- // likely happen a lot
- }
- }
-
void initReceiver() {
- this.startReader(owner);
+ startReader(owner);
}
void setIdleTimeoutTask(SystemTimerTask task) {
- this.idleTask = task;
+ idleTask = task;
}
-
/**
* Returns true if an idle connection was detected.
*/
@@ -548,75 +661,39 @@ public class Connection implements Runnable {
if (isSocketClosed()) {
return true;
}
- if (isSocketInUse() || (this.sharedResource && !this.preserveOrder)) { // shared/unordered
- // connections are used
- // for failure-detection
- // and are not subject to
- // idle-timeout
+ if (isSocketInUse() || sharedResource && !preserveOrder) {
+ // shared/unordered connections are used for failure-detection
+ // and are not subject to idle-timeout
return false;
}
- boolean isIdle = !this.accessed;
- this.accessed = false;
+ boolean isIdle = !accessed;
+ accessed = false;
if (isIdle) {
- this.timedOut = true;
- this.owner.getConduit().getStats().incLostLease();
+ timedOut = true;
+ owner.getConduit().getStats().incLostLease();
if (logger.isDebugEnabled()) {
- logger.debug("Closing idle connection {} shared={} ordered={}", this, this.sharedResource,
- this.preserveOrder);
+ logger.debug("Closing idle connection {} shared={} ordered={}", this, sharedResource,
+ preserveOrder);
}
try {
- // Instead of calling requestClose
- // we call closeForReconnect.
- // We don't want this timeout close to close
- // any other connections. The problem with
- // requestClose has removeEndpoint set to true
- // which will close an receivers we have if this
- // connection is a shared one.
- closeForReconnect(
- "idle connection timed out");
+ // Instead of calling requestClose we call closeForReconnect.
+ // We don't want this timeout close to close any other connections.
+ // The problem with requestClose has removeEndpoint set to true
+ // which will close an receivers we have if this connection is a shared one.
+ closeForReconnect("idle connection timed out");
} catch (Exception ignore) {
}
}
return isIdle;
}
- @MakeNotStatic
- private static final ByteBuffer okHandshakeBuf;
- static {
- int msglen = 1; // one byte for reply code
- byte[] bytes = new byte[MSG_HEADER_BYTES + msglen];
- msglen = calcHdrSize(msglen);
- bytes[MSG_HEADER_SIZE_OFFSET] = (byte) ((msglen / 0x1000000) & 0xff);
- bytes[MSG_HEADER_SIZE_OFFSET + 1] = (byte) ((msglen / 0x10000) & 0xff);
- bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff);
- bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
- bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
- bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
- bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
- bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
- int allocSize = bytes.length;
- ByteBuffer bb;
- if (BufferPool.useDirectBuffers) {
- bb = ByteBuffer.allocateDirect(allocSize);
- } else {
- bb = ByteBuffer.allocate(allocSize);
- }
- bb.put(bytes);
- okHandshakeBuf = bb;
- }
-
- /**
- * maximum message buffer size
- */
- public static final int MAX_MSG_SIZE = 0x00ffffff;
-
static int calcHdrSize(int byteSize) {
if (byteSize > MAX_MSG_SIZE) {
throw new IllegalStateException(String.format("tcp message exceeded max size of %s",
MAX_MSG_SIZE));
}
int hdrSize = byteSize;
- hdrSize |= (HANDSHAKE_VERSION << 24);
+ hdrSize |= HANDSHAKE_VERSION << 24;
return hdrSize;
}
@@ -624,7 +701,7 @@ public class Connection implements Runnable {
return hdrSize & MAX_MSG_SIZE;
}
- static byte calcHdrVersion(int hdrSize) throws IOException {
+ static void calcHdrVersion(int hdrSize) throws IOException {
byte ver = (byte) (hdrSize >> 24);
if (ver != HANDSHAKE_VERSION) {
throw new IOException(
@@ -632,12 +709,11 @@ public class Connection implements Runnable {
"Detected wrong version of GemFire product during handshake. Expected %s but found %s",
HANDSHAKE_VERSION, ver));
}
- return ver;
}
private void sendOKHandshakeReply() throws IOException, ConnectionException {
ByteBuffer my_okHandshakeBuf;
- if (this.isReceiver) {
+ if (isReceiver) {
DistributionConfig cfg = owner.getConduit().getConfig();
ByteBuffer bb;
if (BufferPool.useDirectBuffers) {
@@ -653,8 +729,7 @@ public class Connection implements Runnable {
bb.putInt(cfg.getAsyncQueueTimeout());
bb.putInt(cfg.getAsyncMaxQueueSize());
// write own product version
- Version
- .writeOrdinal(bb, Version.CURRENT.ordinal(), true);
+ Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
// now set the msg length into position 0
bb.putInt(0, calcHdrSize(bb.position() - MSG_HEADER_BYTES));
my_okHandshakeBuf = bb;
@@ -666,19 +741,6 @@ public class Connection implements Runnable {
writeFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
}
- private static final int HANDSHAKE_TIMEOUT_MS =
- Integer.getInteger("p2p.handshakeTimeoutMs", 59000);
- // private static final byte HANDSHAKE_VERSION = 1; // 501
- // public static final byte HANDSHAKE_VERSION = 2; // cbb5x_PerfScale
- // public static final byte HANDSHAKE_VERSION = 3; // durable_client
- // public static final byte HANDSHAKE_VERSION = 4; // dataSerialMay19
- // public static final byte HANDSHAKE_VERSION = 5; // split-brain bits
- // public static final byte HANDSHAKE_VERSION = 6; // direct ack changes
- // NOTICE: handshake_version should not be changed anymore. Use the gemfire
- // version transmitted with the handshake bits and handle old handshakes
- // based on that
- private static final byte HANDSHAKE_VERSION = 7; // product version exchange during handshake
-
/**
* @throws ConnectionException if the conduit has stopped
*/
@@ -686,73 +748,69 @@ public class Connection implements Runnable {
boolean needToClose = false;
String reason = null;
try {
- synchronized (this.handshakeSync) {
- if (!this.handshakeRead && !this.handshakeCancelled) {
- boolean success = false;
+ synchronized (handshakeSync) {
+ if (!handshakeRead && !handshakeCancelled) {
reason = "unknown";
boolean interrupted = Thread.interrupted();
+ boolean success = false;
try {
final long endTime = System.currentTimeMillis() + HANDSHAKE_TIMEOUT_MS;
long msToWait = HANDSHAKE_TIMEOUT_MS;
- while (!this.handshakeRead && !this.handshakeCancelled && msToWait > 0) {
- this.handshakeSync.wait(msToWait); // spurious wakeup ok
- if (!this.handshakeRead && !this.handshakeCancelled) {
+ while (!handshakeRead && !handshakeCancelled && msToWait > 0) {
+ handshakeSync.wait(msToWait); // spurious wakeup ok
+ if (!handshakeRead && !handshakeCancelled) {
msToWait = endTime - System.currentTimeMillis();
}
}
- if (!this.handshakeRead && !this.handshakeCancelled) {
+ if (!handshakeRead && !handshakeCancelled) {
reason = "handshake timed out";
String peerName;
- if (this.remoteAddr != null) {
- peerName = this.remoteAddr.toString();
+ if (remoteAddr != null) {
+ peerName = remoteAddr.toString();
// late in the life of jdk 1.7 we started seeing connections accepted
// when accept() was not even being called. This started causing timeouts
// to occur in the handshake threads instead of causing failures in
// connection-formation. So, we need to initiate suspect processing here
- owner.getDM().getDistribution().suspectMember(this.remoteAddr,
+ owner.getDM().getDistribution().suspectMember(remoteAddr,
String.format(
"Connection handshake with %s timed out after waiting %s milliseconds.",
-
peerName, HANDSHAKE_TIMEOUT_MS));
} else {
- peerName = "socket " + this.socket.getRemoteSocketAddress().toString() + ":"
- + this.socket.getPort();
+ peerName = "socket " + socket.getRemoteSocketAddress() + ":" + socket.getPort();
}
throw new ConnectionException(
String.format(
"Connection handshake with %s timed out after waiting %s milliseconds.",
peerName, HANDSHAKE_TIMEOUT_MS));
- } else {
- success = this.handshakeRead;
}
+ success = handshakeRead;
} catch (InterruptedException ex) {
interrupted = true;
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
reason = "interrupted";
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
if (success) {
- if (this.isReceiver) {
+ if (isReceiver) {
needToClose =
- !owner.getConduit().getMembership().addSurpriseMember(this.remoteAddr);
+ !owner.getConduit().getMembership().addSurpriseMember(remoteAddr);
if (needToClose) {
reason = "this member is shunned";
}
}
} else {
- needToClose = true; // for bug 42159
+ needToClose = true;
}
}
- } // !handshakeRead
- } // synchronized
+ }
+ }
} finally {
if (needToClose) {
- // moved this call outside of the sync for bug 42159
try {
- requestClose(reason); // fix for bug 31546
+ requestClose(reason);
} catch (Exception ignore) {
}
}
@@ -765,18 +823,16 @@ public class Connection implements Runnable {
ByteBuffer buffer = ioFilter.getUnwrappedBuffer(inputBuffer);
buffer.position(0).limit(0);
}
- synchronized (this.handshakeSync) {
+ synchronized (handshakeSync) {
if (success) {
- this.handshakeRead = true;
+ handshakeRead = true;
} else {
- this.handshakeCancelled = true;
+ handshakeCancelled = true;
}
- this.handshakeSync.notify();
+ handshakeSync.notifyAll();
}
}
- private final AtomicBoolean asyncCloseCalled = new AtomicBoolean();
-
/**
* asynchronously close this connection
*
@@ -786,18 +842,18 @@ public class Connection implements Runnable {
// note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
// we do the close in a background thread because the operation may hang if
- // there is a problem with the network. See bug #46659
+ // there is a problem with the network
// if simulating sickness, sockets must be closed in-line so that tests know
// that the vm is sick when the beSick operation completes
if (beingSickForTests) {
prepareForAsyncClose();
} else {
- if (this.asyncCloseCalled.compareAndSet(false, true)) {
- Socket s = this.socket;
+ if (asyncCloseCalled.compareAndSet(false, true)) {
+ Socket s = socket;
if (s != null && !s.isClosed()) {
prepareForAsyncClose();
- this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr),
+ owner.getSocketCloser().asyncClose(s, String.valueOf(remoteAddr),
() -> ioFilter.close(s.getChannel()));
}
}
@@ -813,31 +869,29 @@ public class Connection implements Runnable {
}
}
- private static final int CONNECT_HANDSHAKE_SIZE = 4096;
-
/**
* waits until we've joined the distributed system before returning
*/
private void waitForAddressCompletion() {
- InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
+ InternalDistributedMember myAddr = owner.getConduit().getMemberId();
synchronized (myAddr) {
- while ((!owner.getConduit().getCancelCriterion().isCancelInProgress())
+ while (!owner.getConduit().getCancelCriterion().isCancelInProgress()
&& myAddr.getInetAddress() == null && myAddr.getVmViewId() < 0) {
try {
myAddr.wait(100); // spurious wakeup ok
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
}
}
- Assert.assertTrue(myAddr.getDirectChannelPort() == this.owner.getConduit().getPort());
+ Assert.assertTrue(myAddr.getDirectChannelPort() == owner.getConduit().getPort());
}
}
private void handshakeFromNewSender() throws IOException {
waitForAddressCompletion();
- InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
+ InternalDistributedMember myAddr = owner.getConduit().getMemberId();
final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
/*
* Note a byte of zero is always written because old products serialized a member id with always
@@ -848,30 +902,20 @@ public class Connection implements Runnable {
connectHandshake.writeByte(HANDSHAKE_VERSION);
// NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION
InternalDataSerializer.invokeToData(myAddr, connectHandshake);
- connectHandshake.writeBoolean(this.sharedResource);
- connectHandshake.writeBoolean(this.preserveOrder);
- connectHandshake.writeLong(this.uniqueId);
+ connectHandshake.writeBoolean(sharedResource);
+ connectHandshake.writeBoolean(preserveOrder);
+ connectHandshake.writeLong(uniqueId);
// write the product version ordinal
Version.CURRENT.writeOrdinal(connectHandshake, true);
connectHandshake.writeInt(dominoCount.get() + 1);
// this writes the sending member + thread name that is stored in senderName
// on the receiver to show the cause of reader thread creation
- // if (dominoCount.get() > 0) {
- // connectHandshake.writeUTF(Thread.currentThread().getName());
- // } else {
- // String name = owner.getDM().getConfig().getName();
- // if (name == null) {
- // name = "pid="+OSProcess.getId();
- // }
- // connectHandshake.writeUTF("["+name+"] "+Thread.currentThread().getName());
- // }
connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR,
MsgIdGenerator.NO_MSG_ID);
writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null);
}
/**
- *
* @throws IOException if handshake fails
*/
private void attemptHandshake(ConnectionTable connTable) throws IOException {
@@ -885,32 +929,28 @@ public class Connection implements Runnable {
waitForHandshake(); // waiting for reply
}
- /** time between connection attempts */
- private static final int RECONNECT_WAIT_TIME = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "RECONNECT_WAIT_TIME", 2000);
-
/**
* creates a new connection to a remote server. We are initiating this connection; the other side
* must accept us We will almost always send messages; small acks are received.
*/
- protected static Connection createSender(final Membership mgr, final ConnectionTable t,
+ static Connection createSender(final Membership mgr, final ConnectionTable t,
final boolean preserveOrder, final DistributedMember remoteAddr, final boolean sharedResource,
final long startTime, final long ackTimeout, final long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
- boolean warningPrinted = false;
boolean success = false;
- boolean firstTime = true;
Connection conn = null;
// keep trying. Note that this may be executing during the shutdown window
// where a cancel criterion has not been established, but threads are being
// interrupted. In this case we must allow the connection to succeed even
// though subsequent messaging using the socket may fail
boolean interrupted = Thread.interrupted();
- boolean severeAlertIssued = false;
- boolean suspected = false;
- long reconnectWaitTime = RECONNECT_WAIT_TIME;
- boolean connectionErrorLogged = false;
try {
+ boolean connectionErrorLogged = false;
+ long reconnectWaitTime = RECONNECT_WAIT_TIME;
+ boolean suspected = false;
+ boolean severeAlertIssued = false;
+ boolean firstTime = true;
+ boolean warningPrinted = false;
while (!success) { // keep trying
// Quit if DM has stopped distribution
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -925,9 +965,9 @@ public class Connection implements Runnable {
} else if (!suspected) {
if (remoteAddr != null) {
logger.warn("Unable to form a TCP/IP connection to {} in over {} seconds",
- remoteAddr, (ackTimeout) / 1000);
+ remoteAddr, ackTimeout / 1000);
}
- mgr.suspectMember((InternalDistributedMember) remoteAddr,
+ mgr.suspectMember(remoteAddr,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
@@ -936,9 +976,9 @@ public class Connection implements Runnable {
if (reconnectWaitTime <= 0) {
reconnectWaitTime = RECONNECT_WAIT_TIME;
}
- } else if (!suspected && (startTime > 0) && (ackTimeout > 0)
- && (startTime + ackTimeout < now)) {
- mgr.suspectMember((InternalDistributedMember) remoteAddr,
+ } else if (!suspected && startTime > 0 && ackTimeout > 0
+ && startTime + ackTimeout < now) {
+ mgr.suspectMember(remoteAddr,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
@@ -980,7 +1020,7 @@ public class Connection implements Runnable {
try {
conn = null;
conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
- } catch (javax.net.ssl.SSLHandshakeException se) {
+ } catch (SSLHandshakeException se) {
// no need to retry if certificates were rejected
throw se;
} catch (IOException ioe) {
@@ -1013,8 +1053,7 @@ public class Connection implements Runnable {
// and the socket was closed or we were sent
// ShutdownMessage
if (giveUpOnMember(mgr, remoteAddr)) {
- throw new IOException(String.format("Member %s left the group",
- remoteAddr));
+ throw new IOException(String.format("Member %s left the group", remoteAddr));
}
t.getConduit().getCancelCriterion().checkCancelInProgress(null);
// no success but no need to log; just retry
@@ -1069,11 +1108,9 @@ public class Connection implements Runnable {
}
}
}
- // Assert.assertTrue(conn != null);
if (conn == null) {
throw new ConnectionException(
- String.format("Connection: failed construction for peer %s",
- remoteAddr));
+ String.format("Connection: failed construction for peer %s", remoteAddr));
}
if (preserveOrder && BATCH_SENDS) {
conn.createBatchSendBuffer();
@@ -1082,60 +1119,49 @@ public class Connection implements Runnable {
return conn;
}
- private static boolean giveUpOnMember(Membership mgr,
- DistributedMember remoteAddr) {
+ private static boolean giveUpOnMember(Membership mgr, DistributedMember remoteAddr) {
return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
}
- private void setRemoteAddr(DistributedMember m) {
- this.remoteAddr = this.owner.getDM().getCanonicalId(m);
- Membership mgr = this.conduit.getMembership();
- mgr.addSurpriseMember(m);
- }
-
/**
* creates a new connection to a remote server. We are initiating this connection; the other side
* must accept us We will almost always send messages; small acks are received.
*/
private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
-
// initialize a socket upfront. So that the
InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
if (t == null) {
- throw new IllegalArgumentException(
- "ConnectionTable is null.");
+ throw new IllegalArgumentException("ConnectionTable is null.");
}
- this.conduit = t.getConduit();
- this.isReceiver = false;
- this.owner = t;
+ conduit = t.getConduit();
+ isReceiver = false;
+ owner = t;
this.sharedResource = sharedResource;
this.preserveOrder = preserveOrder;
- setRemoteAddr(remoteAddr);
- this.conduitIdStr = this.owner.getConduit().getSocketId().toString();
- this.handshakeRead = false;
- this.handshakeCancelled = false;
- this.connected = true;
+ this.remoteAddr = remoteAddr;
+ conduitIdStr = owner.getConduit().getSocketId().toString();
+ handshakeRead = false;
+ handshakeCancelled = false;
+ connected = true;
- this.uniqueId = idCounter.getAndIncrement();
+ uniqueId = ID_COUNTER.getAndIncrement();
// connect to listening socket
InetSocketAddress addr =
new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort());
SocketChannel channel = SocketChannel.open();
- this.owner.addConnectingSocket(channel.socket(), addr.getAddress());
+ owner.addConnectingSocket(channel.socket(), addr.getAddress());
try {
channel.socket().setTcpNoDelay(true);
channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
- /*
- * If conserve-sockets is false, the socket can be used for receiving responses, so set the
- * receive buffer accordingly.
- */
+ // If conserve-sockets is false, the socket can be used for receiving responses, so set the
+ // receive buffer accordingly.
if (!sharedResource) {
- setReceiveBufferSize(channel.socket(), this.owner.getConduit().tcpBufferSize);
+ setReceiveBufferSize(channel.socket(), owner.getConduit().tcpBufferSize);
} else {
setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make small since only
// receive ack messages
@@ -1146,231 +1172,98 @@ public class Connection implements Runnable {
int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
try {
-
- channel.socket().connect(addr, connectTime);
-
- createIoFilter(channel, true);
-
- } catch (NullPointerException e) {
- // bug #45044 - jdk 1.7 sometimes throws an NPE here
- ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
- c.initCause(e);
- // prevent a hot loop by sleeping a little bit
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- throw c;
- } catch (SSLException e) {
- ConnectException c = new ConnectException("Problem connecting to peer " + addr);
- c.initCause(e);
- throw c;
- } catch (CancelledKeyException | ClosedSelectorException e) {
- // bug #44469: for some reason NIO throws this runtime exception
- // instead of an IOException on timeouts
- ConnectException c = new ConnectException(
- String.format("Attempt timed out after %s milliseconds",
- connectTime));
- c.initCause(e);
- throw c;
- }
- } finally {
- this.owner.removeConnectingSocket(channel.socket());
- }
- this.socket = channel.socket();
-
- if (logger.isDebugEnabled()) {
- logger.debug("Connection: connected to {} with IP address {}", remoteAddr, addr);
- }
- try {
- getSocket().setTcpNoDelay(true);
- } catch (SocketException ignored) {
- }
- }
-
- /**
- * Batch sends currently should not be turned on because: 1. They will be used for all sends
- * (instead of just no-ack) and thus will break messages that wait for a response (or kill perf).
- * 2. The buffer is not properly flushed and closed on shutdown. The code attempts to do this but
- * must not be doing it correctly.
- */
- private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
- private static final int BATCH_BUFFER_SIZE =
- Integer.getInteger("p2p.batchBufferSize", 1024 * 1024);
- private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50);
- private final Object batchLock = new Object();
- private ByteBuffer fillBatchBuffer;
- private ByteBuffer sendBatchBuffer;
- private BatchBufferFlusher batchFlusher;
-
- private void createBatchSendBuffer() {
- if (BufferPool.useDirectBuffers) {
- this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
- this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
- } else {
- this.fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
- this.sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
- }
- this.batchFlusher = new BatchBufferFlusher();
- this.batchFlusher.start();
- }
-
- void cleanUpOnIdleTaskCancel() {
- // Make sure receivers are removed from the connection table, this should always be a noop, but
- // is done here as a failsafe.
- if (isReceiver) {
- owner.removeReceiver(this);
- }
- }
-
- public void setInputBuffer(ByteBuffer buffer) {
- this.inputBuffer = buffer;
- }
-
- private class BatchBufferFlusher extends Thread {
- private volatile boolean flushNeeded = false;
- private volatile boolean timeToStop = false;
- private DMStats stats;
-
-
- BatchBufferFlusher() {
- setDaemon(true);
- this.stats = owner.getConduit().getStats();
- }
-
- /**
- * Called when a message writer needs the current fillBatchBuffer flushed
- */
- void flushBuffer(ByteBuffer bb) {
- final long start = DistributionStats.getStatTime();
- try {
- synchronized (this) {
- synchronized (batchLock) {
- if (bb != fillBatchBuffer) {
- // it must have already been flushed. So just return
- // and use the new fillBatchBuffer
- return;
- }
- }
- this.flushNeeded = true;
- this.notify();
- }
- synchronized (batchLock) {
- // Wait for the flusher thread
- while (bb == fillBatchBuffer) {
- Connection.this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
- boolean interrupted = Thread.interrupted();
- try {
- batchLock.wait(); // spurious wakeup ok
- } catch (InterruptedException ex) {
- interrupted = true;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- } // while
- }
- } finally {
- owner.getConduit().getStats().incBatchWaitTime(start);
- }
- }
-
- public void close() {
- synchronized (this) {
- this.timeToStop = true;
- this.flushNeeded = true;
- this.notify();
- }
- }
-
- @Override
- public void run() {
- try {
- synchronized (this) {
- while (!timeToStop) {
- if (!this.flushNeeded && fillBatchBuffer.position() <= (BATCH_BUFFER_SIZE / 2)) {
- wait(BATCH_FLUSH_MS); // spurious wakeup ok
- }
- if (this.flushNeeded || fillBatchBuffer.position() > (BATCH_BUFFER_SIZE / 2)) {
- final long start = DistributionStats.getStatTime();
- synchronized (batchLock) {
- // This is the only block of code that will swap
- // the buffer references
- this.flushNeeded = false;
- ByteBuffer tmp = fillBatchBuffer;
- fillBatchBuffer = sendBatchBuffer;
- sendBatchBuffer = tmp;
- batchLock.notifyAll();
- }
- // We now own the sendBatchBuffer
- if (sendBatchBuffer.position() > 0) {
- final boolean origSocketInUse = socketInUse;
- socketInUse = true;
- try {
- sendBatchBuffer.flip();
- SocketChannel channel = getSocket().getChannel();
- writeFully(channel, sendBatchBuffer, false, null);
- sendBatchBuffer.clear();
- } catch (IOException | ConnectionException ex) {
- logger.fatal("Exception flushing batch send buffer: %s", ex);
- readerShuttingDown = true;
- requestClose(String.format("Exception flushing batch send buffer: %s",
- ex));
- } finally {
- accessed();
- socketInUse = origSocketInUse;
- }
- }
- this.stats.incBatchFlushTime(start);
- }
- }
+
+ channel.socket().connect(addr, connectTime);
+
+ createIoFilter(channel, true);
+
+ } catch (NullPointerException e) {
+ // jdk 1.7 sometimes throws an NPE here
+ ConnectException c = new ConnectException("Encountered bug #45044 - retrying");
+ c.initCause(e);
+ // prevent a hot loop by sleeping a little bit
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
- } catch (InterruptedException ex) {
- // time for this thread to shutdown
- // Thread.currentThread().interrupt();
+ throw c;
+ } catch (SSLException e) {
+ ConnectException c = new ConnectException("Problem connecting to peer " + addr);
+ c.initCause(e);
+ throw c;
+ } catch (CancelledKeyException | ClosedSelectorException e) {
+ // for some reason NIO throws this runtime exception instead of an IOException on timeouts
+ ConnectException c = new ConnectException(
+ String.format("Attempt timed out after %s milliseconds", connectTime));
+ c.initCause(e);
+ throw c;
}
+ } finally {
+ owner.removeConnectingSocket(channel.socket());
+ }
+ socket = channel.socket();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection: connected to {} with IP address {}", remoteAddr, addr);
+ }
+ try {
+ getSocket().setTcpNoDelay(true);
+ } catch (SocketException ignored) {
}
}
- private void closeBatchBuffer() {
- if (this.batchFlusher != null) {
- this.batchFlusher.close();
+ private void createBatchSendBuffer() {
+ if (BufferPool.useDirectBuffers) {
+ fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
+ sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE);
+ } else {
+ fillBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
+ sendBatchBuffer = ByteBuffer.allocate(BATCH_BUFFER_SIZE);
}
+ batchFlusher = new BatchBufferFlusher();
+ batchFlusher.start();
}
- /**
- * use to test message prep overhead (no socket write). WARNING: turning this on completely
- * disables distribution of batched sends
- */
- private static final boolean SOCKET_WRITE_DISABLED = Boolean.getBoolean("p2p.disableSocketWrite");
+ void cleanUpOnIdleTaskCancel() {
+ // Make sure receivers are removed from the connection table, this should always be a noop, but
+ // is done here as a fail safe.
+ if (isReceiver) {
+ owner.removeReceiver(this);
+ }
+ }
- private void batchSend(ByteBuffer src) throws IOException {
+ private void closeBatchBuffer() {
+ if (batchFlusher != null) {
+ batchFlusher.close();
+ }
+ }
+
+ private void batchSend(ByteBuffer src) {
if (SOCKET_WRITE_DISABLED) {
return;
}
final long start = DistributionStats.getStatTime();
try {
- ByteBuffer dst = null;
Assert.assertTrue(src.remaining() <= BATCH_BUFFER_SIZE, "Message size(" + src.remaining()
+ ") exceeded BATCH_BUFFER_SIZE(" + BATCH_BUFFER_SIZE + ")");
do {
- synchronized (this.batchLock) {
- dst = this.fillBatchBuffer;
+ ByteBuffer dst;
+ synchronized (batchLock) {
+ dst = fillBatchBuffer;
if (src.remaining() <= dst.remaining()) {
final long copyStart = DistributionStats.getStatTime();
dst.put(src);
- this.owner.getConduit().getStats().incBatchCopyTime(copyStart);
+ owner.getConduit().getStats().incBatchCopyTime(copyStart);
return;
}
}
// If we got this far then we do not have room in the current
// buffer and need the flusher thread to flush before we can fill it
- this.batchFlusher.flushBuffer(dst);
+ batchFlusher.flushBuffer(dst);
} while (true);
} finally {
- this.owner.getConduit().getStats().incBatchSendTime(start);
+ owner.getConduit().getStats().incBatchSendTime(start);
}
}
@@ -1383,7 +1276,7 @@ public class Connection implements Runnable {
}
boolean isClosing() {
- return this.closing.get();
+ return closing.get();
}
void closePartialConnect(String reason, boolean beingSick) {
@@ -1403,107 +1296,97 @@ public class Connection implements Runnable {
*
* @see #requestClose
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "TLW_TWO_LOCK_WAIT")
+ @SuppressWarnings("TLW_TWO_LOCK_WAIT")
private void close(String reason, boolean cleanupEndpoint, boolean p_removeEndpoint,
boolean beingSick, boolean forceRemoval) {
- boolean removeEndpoint = p_removeEndpoint;
- // use getAndSet outside sync on this to fix 42330
- boolean onlyCleanup = this.closing.getAndSet(true);
+ // use getAndSet outside sync on this
+ boolean onlyCleanup = closing.getAndSet(true);
if (onlyCleanup && !forceRemoval) {
return;
}
+ boolean removeEndpoint = p_removeEndpoint;
if (!onlyCleanup) {
synchronized (this) {
- this.stopped = true;
- if (this.connected) {
- if (this.asyncQueuingInProgress && this.pusherThread != Thread.currentThread()) {
+ stopped = true;
+ if (connected) {
+ if (asyncQueuingInProgress && pusherThread != Thread.currentThread()) {
// We don't need to do this if we are the pusher thread
// and we have determined that we need to close the connection.
- // See bug 37601.
- synchronized (this.outgoingQueue) {
+ synchronized (outgoingQueue) {
// wait for the flusher to complete (it may timeout)
- while (this.asyncQueuingInProgress) {
- // Don't do this: causes closes to not get done in the event
- // of an orderly shutdown:
- // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ while (asyncQueuingInProgress) {
boolean interrupted = Thread.interrupted();
try {
- this.outgoingQueue.wait(); // spurious wakeup ok
+ outgoingQueue.wait(); // spurious wakeup ok
} catch (InterruptedException ie) {
interrupted = true;
- // this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
- } // while
- } // synchronized
+ }
+ }
}
- this.connected = false;
+ connected = false;
closeSenderSem();
- {
- final DMStats stats = this.owner.getConduit().getStats();
- if (this.finishedConnecting) {
- if (this.isReceiver) {
- stats.decReceivers();
- } else {
- stats.decSenders(this.sharedResource, this.preserveOrder);
- }
+
+ final DMStats stats = owner.getConduit().getStats();
+ if (finishedConnecting) {
+ if (isReceiver) {
+ stats.decReceivers();
+ } else {
+ stats.decSenders(sharedResource, preserveOrder);
}
}
+
} else if (!forceRemoval) {
removeEndpoint = false;
}
// make sure our socket is closed
asyncClose(false);
- if (!this.isReceiver) {
+ if (!isReceiver) {
// receivers release the input buffer when exiting run(). Senders use the
// inputBuffer for reading direct-reply responses
releaseInputBuffer();
}
lengthSet = false;
- } // synchronized
+ }
- // moved the call to notifyHandshakeWaiter out of the above
- // synchronized block to fix bug #42159
// Make sure anyone waiting for a handshake stops waiting
notifyHandshakeWaiter(false);
- // wait a bit for the our reader thread to exit
- // don't wait if we are the reader thread
+ // wait a bit for the our reader thread to exit don't wait if we are the reader thread
boolean isIBM = false;
// if network partition detection is enabled or this is an admin vm
- // we can't wait for the reader thread when running in an IBM JRE. See
- // bug 41889
- if (this.conduit.getConfig().getEnableNetworkPartitionDetection()
- || this.conduit.getMemberId().getVmKind() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
- || this.conduit.getMemberId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
+ // we can't wait for the reader thread when running in an IBM JRE
+ if (conduit.getConfig().getEnableNetworkPartitionDetection()
+ || conduit.getMemberId().getVmKind() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
+ || conduit.getMemberId().getVmKind() == ClusterDistributionManager.LOCATOR_DM_TYPE) {
isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
}
- {
- // Now that readerThread is returned to a pool after we close
- // we need to be more careful not to join on a thread that belongs
- // to someone else.
- Thread readerThreadSnapshot = this.readerThread;
- if (!beingSick && readerThreadSnapshot != null && !isIBM && this.isRunning
- && !this.readerShuttingDown && readerThreadSnapshot != Thread.currentThread()) {
- try {
- readerThreadSnapshot.join(500);
- readerThreadSnapshot = this.readerThread;
- if (this.isRunning && !this.readerShuttingDown && readerThreadSnapshot != null
- && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system
- // failure
- readerThreadSnapshot.join(1500);
- if (this.isRunning) {
- logger.info("Timed out waiting for readerThread on {} to finish.",
- this);
- }
+
+ // Now that readerThread is returned to a pool after we close
+ // we need to be more careful not to join on a thread that belongs
+ // to someone else.
+ Thread readerThreadSnapshot = readerThread;
+ if (!beingSick && readerThreadSnapshot != null && !isIBM && isRunning
+ && !readerShuttingDown && readerThreadSnapshot != Thread.currentThread()) {
+ try {
+ readerThreadSnapshot.join(500);
+ readerThreadSnapshot = readerThread;
+ if (isRunning && !readerShuttingDown && readerThreadSnapshot != null
+ && owner.getDM().getRootCause() == null) {
+ // don't wait twice if there's a system failure
+ readerThreadSnapshot.join(1500);
+ if (isRunning) {
+ logger.info("Timed out waiting for readerThread on {} to finish.",
+ this);
}
- } catch (IllegalThreadStateException ignore) {
- // ignored - thread already stopped
- } catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- // but keep going, we're trying to close.
}
+ } catch (IllegalThreadStateException ignore) {
+ // ignored - thread already stopped
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ // but keep going, we're trying to close.
}
}
@@ -1511,43 +1394,40 @@ public class Connection implements Runnable {
closeAllMsgDestreamers();
}
if (cleanupEndpoint) {
- if (this.isReceiver) {
- this.owner.removeReceiver(this);
+ if (isReceiver) {
+ owner.removeReceiver(this);
}
if (removeEndpoint) {
- if (this.sharedResource) {
- if (!this.preserveOrder) {
- // only remove endpoint when shared unordered connection
- // is closed. This is part of the fix for bug 32980.
- if (!this.isReceiver) {
+ if (sharedResource) {
+ if (!preserveOrder) {
+ // only remove endpoint when shared unordered connection is closed
+ if (!isReceiver) {
// Only remove endpoint if sender.
- if (this.finishedConnecting) {
+ if (finishedConnecting) {
// only remove endpoint if our constructor finished
- this.owner.removeEndpoint(this.remoteAddr, reason);
+ owner.removeEndpoint(remoteAddr, reason);
}
}
} else {
// noinspection ConstantConditions
- this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
+ owner.removeSharedConnection(reason, remoteAddr, preserveOrder, this);
}
- } else if (!this.isReceiver) {
- this.owner.removeThreadConnection(this.remoteAddr, this);
+ } else if (!isReceiver) {
+ owner.removeThreadConnection(remoteAddr, this);
}
} else {
- // This code is ok to do even if the ConnectionTable
- // has never added this Connection to its maps since
- // the calls in this block use our identity to do the removes.
- if (this.sharedResource) {
- this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
- } else if (!this.isReceiver) {
- this.owner.removeThreadConnection(this.remoteAddr, this);
+ // This code is ok to do even if the ConnectionTable has never added this Connection to its
+ // maps since the calls in this block use our identity to do the removes.
+ if (sharedResource) {
+ owner.removeSharedConnection(reason, remoteAddr, preserveOrder, this);
+ } else if (!isReceiver) {
+ owner.removeThreadConnection(remoteAddr, this);
}
}
}
- // This cancels the idle timer task, but it also removes the tasks
- // reference to this connection, freeing up the connection (and it's buffers
- // for GC sooner.
+ // This cancels the idle timer task, but it also removes the tasks reference to this connection,
+ // freeing up the connection (and it's buffers for GC sooner.
if (idleTask != null) {
idleTask.cancel();
}
@@ -1555,69 +1435,66 @@ public class Connection implements Runnable {
if (ackTimeoutTask != null) {
ackTimeoutTask.cancel();
}
-
}
- /** starts a reader thread */
+ /**
+ * starts a reader thread
+ */
private void startReader(ConnectionTable connTable) {
if (logger.isDebugEnabled()) {
logger.debug("Starting thread for " + p2pReaderName());
}
- Assert.assertTrue(!this.isRunning);
+ Assert.assertTrue(!isRunning);
stopped = false;
- this.isRunning = true;
+ isRunning = true;
connTable.executeCommand(this);
}
-
/**
* in order to read non-NIO socket-based messages we need to have a thread actively trying to grab
* bytes out of the sockets input queue. This is that thread.
*/
@Override
public void run() {
- this.readerThread = Thread.currentThread();
- this.readerThread.setName(p2pReaderName());
+ readerThread = Thread.currentThread();
+ readerThread.setName(p2pReaderName());
ConnectionTable.threadWantsSharedResources();
- makeReaderThread(this.isReceiver);
+ makeReaderThread(isReceiver);
try {
readMessages();
} finally {
- // bug36060: do the socket close within a finally block
+ // do the socket close within a finally block
if (logger.isDebugEnabled()) {
logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr);
}
- if (this.isReceiver) {
+ if (isReceiver) {
try {
initiateSuspicionIfSharedUnordered();
} catch (CancelException e) {
// shutting down
}
- if (!this.sharedResource) {
- this.conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get());
+ if (!sharedResource) {
+ conduit.getStats().incThreadOwnedReceivers(-1L, dominoCount.get());
}
asyncClose(false);
- this.owner.removeAndCloseThreadOwnedSockets();
+ owner.removeAndCloseThreadOwnedSockets();
}
releaseInputBuffer();
- // make sure that if the reader thread exits we notify a thread waiting
- // for the handshake.
- // see bug 37524 for an example of listeners hung in waitForHandshake
+ // make sure that if the reader thread exits we notify a thread waiting for the handshake.
notifyHandshakeWaiter(false);
- this.readerThread.setName("unused p2p reader");
- synchronized (this.stateLock) {
- this.isRunning = false;
- this.readerThread = null;
+ readerThread.setName("unused p2p reader");
+ synchronized (stateLock) {
+ isRunning = false;
+ readerThread = null;
}
- } // finally
+ }
}
private void releaseInputBuffer() {
- ByteBuffer tmp = this.inputBuffer;
+ ByteBuffer tmp = inputBuffer;
if (tmp != null) {
- this.inputBuffer = null;
- final MembershipStatistics stats = this.owner.getConduit().getStats();
+ inputBuffer = null;
getBufferPool().releaseReceiveBuffer(tmp);
}
}
@@ -1628,22 +1505,22 @@ public class Connection implements Runnable {
private String p2pReaderName() {
StringBuilder sb = new StringBuilder(64);
- if (this.isReceiver) {
+ if (isReceiver) {
sb.append(THREAD_KIND_IDENTIFIER + "@");
- } else if (this.handshakeRead) {
+ } else if (handshakeRead) {
sb.append("P2P message sender@");
} else {
sb.append("P2P handshake reader@");
}
sb.append(Integer.toHexString(System.identityHashCode(this)));
- if (!this.isReceiver) {
+ if (!isReceiver) {
sb.append('-').append(getUniqueId());
}
return sb.toString();
}
private void readMessages() {
- // take a snapshot of uniqueId to detect reconnect attempts; see bug 37592
+ // take a snapshot of uniqueId to detect reconnect attempts
SocketChannel channel;
try {
channel = getSocket().getChannel();
@@ -1654,66 +1531,58 @@ public class Connection implements Runnable {
}
channel.configureBlocking(true);
} catch (ClosedChannelException e) {
- // bug 37693: the channel was asynchronously closed. Our work
- // is done.
+ // the channel was asynchronously closed. Our work is done.
try {
- requestClose(
- "readMessages caught closed channel");
+ requestClose("readMessages caught closed channel");
} catch (Exception ignore) {
}
- return; // exit loop and thread
+ // exit loop and thread
+ return;
} catch (IOException ex) {
if (stopped || owner.getConduit().getCancelCriterion().isCancelInProgress()) {
try {
- requestClose(
- "readMessages caught shutdown");
+ requestClose("readMessages caught shutdown");
} catch (Exception ignore) {
}
- return; // bug37520: exit loop (and thread)
+ // exit loop (and thread)
+ return;
}
logger.info("Failed initializing socket for message {}: {}",
- (this.isReceiver ? "receiver" : "sender"), ex.getMessage());
- this.readerShuttingDown = true;
+ isReceiver ? "receiver" : "sender", ex.getMessage());
+ readerShuttingDown = true;
try {
- requestClose(String.format("Failed initializing socket %s",
- ex));
+ requestClose(String.format("Failed initializing socket %s", ex));
} catch (Exception ignore) {
}
return;
}
if (!stopped) {
- // Assert.assertTrue(owner != null, "How did owner become null");
if (logger.isDebugEnabled()) {
logger.debug("Starting {} on {}", p2pReaderName(), socket);
}
}
// we should not change the state of the connection if we are a handshake reader thread
// as there is a race between this thread and the application thread doing direct ack
- // fix for #40869
boolean isHandShakeReader = false;
// if we're using SSL/TLS the input buffer may already have data to process
boolean skipInitialRead = getInputBuffer().position() > 0;
- boolean isInitialRead = true;
try {
- for (;;) {
+ for (boolean isInitialRead = true;;) {
if (stopped) {
break;
}
if (SystemFailure.getFailure() != null) {
// Allocate no objects here!
- Socket s = this.socket;
- if (s != null) {
- try {
- ioFilter.close(s.getChannel());
- s.close();
- } catch (IOException e) {
- // don't care
- }
+ try {
+ ioFilter.close(socket.getChannel());
+ socket.close();
+ } catch (IOException e) {
+ // don't care
}
- SystemFailure.checkFailure(); // throws
+ SystemFailure.checkFailure();
}
- if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
+ if (owner.getConduit().getCancelCriterion().isCancelInProgress()) {
break;
}
@@ -1740,7 +1609,7 @@ public class Connection implements Runnable {
continue;
}
if (amountRead < 0) {
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
try {
requestClose("SocketChannel.read returned EOF");
} catch (Exception e) {
@@ -1751,9 +1620,9 @@ public class Connection implements Runnable {
processInputBuffer();
- if (!this.isReceiver && (this.handshakeRead || this.handshakeCancelled)) {
+ if (!isReceiver && (handshakeRead || handshakeCancelled)) {
if (logger.isDebugEnabled()) {
- if (this.handshakeRead) {
+ if (handshakeRead) {
logger.debug("handshake has been read {}", this);
} else {
logger.debug("handshake has been cancelled {}", this);
@@ -1767,26 +1636,22 @@ public class Connection implements Runnable {
if (logger.isDebugEnabled()) {
logger.debug("{} Terminated <{}> due to cancellation", p2pReaderName(), this, e);
}
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
try {
- requestClose(
- String.format("CacheClosed in channel read: %s", e));
+ requestClose(String.format("CacheClosed in channel read: %s", e));
} catch (Exception ignored) {
}
return;
} catch (ClosedChannelException e) {
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
try {
- requestClose(String.format("ClosedChannelException in channel read: %s",
- e));
+ requestClose(String.format("ClosedChannelException in channel read: %s", e));
} catch (Exception ignored) {
}
return;
} catch (IOException e) {
- if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage()) // needed for
- // Solaris jdk
- // 1.4.2_08
- ) {
+ // "Socket closed" check needed for Solaris jdk 1.4.2_08
+ if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) {
if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
logger.debug("{} io exception for {}", p2pReaderName(), this, e);
}
@@ -1798,28 +1663,26 @@ public class Connection implements Runnable {
}
}
}
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
try {
- requestClose(
- String.format("IOException in channel read: %s", e));
+ requestClose(String.format("IOException in channel read: %s", e));
} catch (Exception ignored) {
}
return;
} catch (Exception e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
if (!stopped && !isSocketClosed()) {
logger.fatal(String.format("%s exception in channel read", p2pReaderName()), e);
}
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
try {
- requestClose(
- String.format("%s exception in channel read", e));
+ requestClose(String.format("%s exception in channel read", e));
} catch (Exception ignored) {
}
return;
}
- } // for
+ }
} finally {
if (!isHandShakeReader) {
synchronized (stateLock) {
@@ -1840,8 +1703,7 @@ public class Connection implements Runnable {
getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort());
int packetBufferSize = engine.getSession().getPacketBufferSize();
- if (inputBuffer == null
- || (inputBuffer.capacity() < packetBufferSize)) {
+ if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) {
// TLS has a minimum input buffer size constraint
if (inputBuffer != null) {
getBufferPool().releaseReceiveBuffer(inputBuffer);
@@ -1866,9 +1728,9 @@ public class Connection implements Runnable {
* initiate suspect processing if a shared/ordered connection is lost and we're not shutting down
*/
private void initiateSuspicionIfSharedUnordered() {
- if (this.isReceiver && this.handshakeRead && !this.preserveOrder && this.sharedResource) {
- if (!this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
- this.owner.getDM().getDistribution().suspectMember(this.getRemoteAddress(),
+ if (isReceiver && handshakeRead && !preserveOrder && sharedResource) {
+ if (!owner.getConduit().getCancelCriterion().isCancelInProgress()) {
+ owner.getDM().getDistribution().suspectMember(getRemoteAddress(),
INITIATING_SUSPECT_PROCESSING);
}
}
@@ -1889,61 +1751,61 @@ public class Connection implements Runnable {
}
msg = msg.toLowerCase();
- return (msg.contains("forcibly closed")) || (msg.contains("reset by peer"))
- || (msg.contains("connection reset"));
+ return msg.contains("forcibly closed")
+ || msg.contains("reset by peer")
+ || msg.contains("connection reset");
}
private static boolean validMsgType(int msgType) {
- return msgType == NORMAL_MSG_TYPE || msgType == CHUNKED_MSG_TYPE
+ return msgType == NORMAL_MSG_TYPE
+ || msgType == CHUNKED_MSG_TYPE
|| msgType == END_CHUNKED_MSG_TYPE;
}
private void closeAllMsgDestreamers() {
- synchronized (this.destreamerLock) {
- if (this.idleMsgDestreamer != null) {
- this.idleMsgDestreamer.close();
- this.idleMsgDestreamer = null;
- }
- if (this.destreamerMap != null) {
- Iterator it = this.destreamerMap.values().iterator();
- while (it.hasNext()) {
- MsgDestreamer md = (MsgDestreamer) it.next();
+ synchronized (destreamerLock) {
+ if (idleMsgDestreamer != null) {
+ idleMsgDestreamer.close();
+ idleMsgDestreamer = null;
+ }
+ if (destreamerMap != null) {
+ for (Object o : destreamerMap.values()) {
+ MsgDestreamer md = (MsgDestreamer) o;
md.close();
}
- this.destreamerMap = null;
+ destreamerMap = null;
}
}
}
- MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) {
- synchronized (this.destreamerLock) {
- if (this.destreamerMap == null) {
- this.destreamerMap = new HashMap();
+ private MsgDestreamer obtainMsgDestreamer(short msgId, final Version v) {
+ synchronized (destreamerLock) {
+ if (destreamerMap == null) {
+ destreamerMap = new HashMap();
}
- Short key = new Short(msgId);
- MsgDestreamer result = (MsgDestreamer) this.destreamerMap.get(key);
+ Short key = msgId;
+ MsgDestreamer result = (MsgDestreamer) destreamerMap.get(key);
if (result == null) {
- result = this.idleMsgDestreamer;
+ result = idleMsgDestreamer;
if (result != null) {
- this.idleMsgDestreamer = null;
+ idleMsgDestreamer = null;
} else {
- result = new MsgDestreamer(this.owner.getConduit().getStats(),
- this.conduit.getCancelCriterion(), v);
+ result =
+ new MsgDestreamer(owner.getConduit().getStats(), conduit.getCancelCriterion(), v);
}
result.setName(p2pReaderName() + " msgId=" + msgId);
- this.destreamerMap.put(key, result);
+ destreamerMap.put(key, result);
}
return result;
}
}
- void releaseMsgDestreamer(short msgId, MsgDestreamer md) {
- Short key = new Short(msgId);
- synchronized (this.destreamerLock) {
- this.destreamerMap.remove(key);
- if (this.idleMsgDestreamer == null) {
+ private void releaseMsgDestreamer(short msgId, MsgDestreamer md) {
+ synchronized (destreamerLock) {
+ destreamerMap.remove(msgId);
+ if (idleMsgDestreamer == null) {
md.reset();
- this.idleMsgDestreamer = md;
+ idleMsgDestreamer = md;
} else {
md.close();
}
@@ -1956,13 +1818,12 @@ public class Connection implements Runnable {
ReplySender dm = new DirectReplySender(this);
ReplyMessage.send(getRemoteAddress(), rpId, exception, dm);
} else if (rpId != 0) {
- DistributionManager dm = this.owner.getDM();
+ DistributionManager dm = owner.getDM();
dm.getExecutors().getWaitingThreadPool()
.execute(() -> ReplyMessage.send(getRemoteAddress(), rpId, exception, dm));
}
}
-
/**
* sends a serialized message to the other end of this connection. This is used by the
* DirectChannel in GemFire when the message is going to be sent to multiple recipients.
@@ -1972,20 +1833,19 @@ public class Connection implements Runnable {
void sendPreserialized(ByteBuffer buffer, boolean cacheContentChanges,
DistributionMessage msg) throws IOException, ConnectionException {
if (!connected) {
- throw new ConnectionException(
- String.format("Not connected to %s", this.remoteAddr));
+ throw new ConnectionException(String.format("Not connected to %s", remoteAddr));
}
- if (this.batchFlusher != null) {
+ if (batchFlusher != null) {
batchSend(buffer);
return;
}
- final boolean origSocketInUse = this.socketInUse;
+ final boolean origSocketInUse = socketInUse;
byte originalState;
synchronized (stateLock) {
- originalState = this.connectionState;
- this.connectionState = STATE_SENDING;
+ originalState = connectionState;
+ connectionState = STATE_SENDING;
}
- this.socketInUse = true;
+ socketInUse = true;
try {
SocketChannel channel = getSocket().getChannel();
writeFully(channel, buffer, false, msg);
@@ -1994,70 +1854,68 @@ public class Connection implements Runnable {
}
} finally {
accessed();
- this.socketInUse = origSocketInUse;
+ socketInUse = origSocketInUse;
synchronized (stateLock) {
- this.connectionState = originalState;
+ connectionState = originalState;
}
}
}
/**
- * If <code>use</code> is true then "claim" the connection for our use. If <code>use</code> is
- * false then "release" the connection. Fixes bug 37657.
- *
- * @return true if connection was already in use at time of call; false if not.
+ * If {@code use} is true then "claim" the connection for our use. If {@code use} is
+ * false then "release" the connection.
*/
- public boolean setInUse(boolean use, long startTime, long ackWaitThreshold, long ackSAThreshold,
+ public void setInUse(boolean use, long startTime, long ackWaitThreshold, long ackSAThreshold,
List connectionGroup) {
// just do the following; EVEN if the connection has been closed
- final boolean origSocketInUse = this.socketInUse;
synchronized (this) {
if (use && (ackWaitThreshold > 0 || ackSAThreshold > 0)) {
// set times that events should be triggered
- this.transmissionStartTime = startTime;
- this.ackWaitTimeout = ackWaitThreshold;
- this.ackSATimeout = ackSAThreshold;
- this.ackConnectionGroup = connectionGroup;
- this.ackThreadName = Thread.currentThread().getName();
+ transmissionStartTime = startTime;
+ ackWaitTimeout = ackWaitThreshold;
+ ackSATimeout = ackSAThreshold;
+ ackConnectionGroup = connectionGroup;
+ ackThreadName = Thread.currentThread().getName();
} else {
- this.ackWaitTimeout = 0;
- this.ackSATimeout = 0;
- this.ackConnectionGroup = null;
- this.ackThreadName = null;
+ ackWaitTimeout = 0;
+ ackSATimeout = 0;
+ ackConnectionGroup = null;
+ ackThreadName = null;
}
- synchronized (this.stateLock) {
- this.connectionState = STATE_IDLE;
+ synchronized (stateLock) {
+ connectionState = STATE_IDLE;
}
- this.socketInUse = use;
+ socketInUse = use;
}
if (!use) {
accessed();
}
- return origSocketInUse;
}
/**
* For testing we want to configure the connection without having to read a handshake
*/
+ @VisibleForTesting
void setSharedUnorderedForTest() {
- this.preserveOrder = false;
- this.sharedResource = true;
- this.handshakeRead = true;
+ preserveOrder = false;
+ sharedResource = true;
+ handshakeRead = true;
}
-
- /** ensure that a task is running to monitor transmission and reading of acks */
+ /**
+ * ensure that a task is running to monitor transmission and reading of acks
+ */
synchronized void scheduleAckTimeouts() {
if (ackTimeoutTask == null) {
- final long msAW = this.owner.getDM().getConfig().getAckWaitThreshold() * 1000L;
- final long msSA = this.owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000L;
+ final long msAW = owner.getDM().getConfig().getAckWaitThreshold() * 1000L;
+ final long msSA = owner.getDM().getConfig().getAckSevereAlertThreshold() * 1000L;
ackTimeoutTask = new SystemTimer.SystemTimerTask() {
@Override
public void run2() {
if (owner.isClosed()) {
return;
}
- byte connState = -1;
+ byte connState;
synchronized (stateLock) {
connState = connectionState;
}
@@ -2083,12 +1941,11 @@ public class Connection implements Runnable {
}
List group = ackConnectionGroup;
if (sentAlert && group != null) {
- // since transmission and ack-receipt are performed serially, we don't
- // want to complain about all receivers out just because one was slow. We therefore
- // reset
- // the time stamps and give others more time
- for (Iterator it = group.iterator(); it.hasNext();) {
- Connection con = (Connection) it.next();
+ // since transmission and ack-receipt are performed serially, we don't want to complain
+ // about all receivers out just because one was slow. We therefore reset the time stamps
+ // and give others more time
+ for (Object o : group) {
+ Connection con = (Connection) o;
if (con != Connection.this) {
con.transmissionStartTime += con.ackSATimeout;
}
@@ -2110,10 +1967,12 @@ public class Connection implements Runnable {
}
}
- /** ack-wait-threshold and ack-severe-alert-threshold processing */
+ /**
+ * ack-wait-threshold and ack-severe-alert-threshold processing
+ */
private boolean doSevereAlertProcessing() {
long now = System.currentTimeMillis();
- if (ackSATimeout > 0 && (transmissionStartTime + ackWaitTimeout + ackSATimeout) <= now) {
+ if (ackSATimeout > 0 && transmissionStartTime + ackWaitTimeout + ackSATimeout <= now) {
logger.fatal("{} seconds have elapsed waiting for a response from {} for thread {}",
(ackWaitTimeout + ackSATimeout) / 1000L,
getRemoteAddress(),
@@ -2121,17 +1980,18 @@ public class Connection implements Runnable {
// turn off subsequent checks by setting the timeout to zero, then boot the member
ackSATimeout = 0;
return true;
- } else if (!ackTimedOut && (0 < ackWaitTimeout)
- && (transmissionStartTime + ackWaitTimeout) <= now) {
+ }
+ if (!ackTimedOut && 0 < ackWaitTimeout
+ && transmissionStartTime + ackWaitTimeout <= now) {
logger.warn("{} seconds have elapsed waiting for a response from {} for thread {}",
ackWaitTimeout / 1000L, getRemoteAddress(), ackThreadName);
ackTimedOut = true;
- final String state = (connectionState == Connection.STATE_SENDING)
+ final String state = connectionState == Connection.STATE_SENDING
? "Sender has been unable to transmit a message within ack-wait-threshold seconds"
: "Sender has been unable to receive a response to a message within ack-wait-threshold seconds";
if (ackSATimeout > 0) {
- this.owner.getDM().getDistribution()
+ owner.getDM().getDistribution()
.suspectMembers(Collections.singleton(getRemoteAddress()), state);
}
}
@@ -2140,7 +2000,7 @@ public class Connection implements Runnable {
private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
throws ConnectionException {
- final DMStats stats = this.owner.getConduit().getStats();
+ final DMStats stats = owner.getConduit().getStats();
long start = DistributionStats.getStatTime();
try {
ConflationKey ck = null;
@@ -2148,10 +2008,9 @@ public class Connection implements Runnable {
ck = msg.getConflationKey();
}
Object objToQueue = null;
- // if we can conflate delay the copy to see if we can reuse
- // an already allocated buffer.
+ // if we can conflate delay the copy to see if we can reuse an already allocated buffer.
final int newBytes = buffer.remaining();
- final int origBufferPos = buffer.position(); // to fix bug 34832
+ final int origBufferPos = buffer.position();
if (ck == null || !ck.allowsConflation()) {
// do this outside of sync for multi thread perf
ByteBuffer newbb = ByteBuffer.allocate(newBytes);
@@ -2159,15 +2018,14 @@ public class Connection implements Runnable {
newbb.flip();
objToQueue = newbb;
}
- synchronized (this.outgoingQueue) {
- if (this.disconnectRequested) {
+ synchronized (outgoingQueue) {
+ if (disconnectRequested) {
buffer.position(origBufferPos);
// we have given up so just drop this message.
- throw new ConnectionException(String.format("Forced disconnect sent to %s",
- this.remoteAddr));
+ throw new ConnectionException(String.format("Forced disconnect sent to %s", remoteAddr));
}
- if (!force && !this.asyncQueuingInProgress) {
- // reset buffer since we will be sending it. This fixes bug 34832
+ if (!force && !asyncQueuingInProgress) {
+ // reset buffer since we will be sending it
buffer.position(origBufferPos);
// the pusher emptied the queue so don't add since we are not forced to.
return false;
@@ -2176,25 +2034,29 @@ public class Connection implements Runnable {
if (ck != null) {
if (ck.allowsConflation()) {
objToQueue = ck;
- Object oldValue = this.conflatedKeys.put(ck, ck);
+ Object oldValue = conflatedKeys.put(ck, ck);
if (oldValue != null) {
ConflationKey oldck = (ConflationKey) oldValue;
ByteBuffer oldBuffer = oldck.getBuffer();
// need to always do this to allow old buffer to be gc'd
oldck.setBuffer(null);
+
// remove the conflated key from current spot in queue
+
// Note we no longer remove from the queue because the search
// can be expensive on large queues. Instead we just wait for
// the queue removal code to find the oldck and ignore it since
// its buffer is null
+
// We do a quick check of the last thing in the queue
// and if it has the same identity of our last thing then
// remove it
- if (this.outgoingQueue.getLast() == oldck) {
- this.outgoingQueue.removeLast();
+
+ if (outgoingQueue.getLast() == oldck) {
+ outgoingQueue.removeLast();
}
int oldBytes = oldBuffer.remaining();
- this.queuedBytes -= oldBytes;
+ queuedBytes -= oldBytes;
stats.incAsyncQueueSize(-oldBytes);
stats.incAsyncConflatedMsgs();
didConflation = true;
@@ -2220,23 +2082,23 @@ public class Connection implements Runnable {
}
} else {
// just forget about having a conflatable operation
- /* Object removedVal = */ this.conflatedKeys.remove(ck);
+ conflatedKeys.remove(ck);
}
}
- {
- long newQueueSize = newBytes + this.queuedBytes;
- if (newQueueSize > this.asyncMaxQueueSize) {
- logger.warn("Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
- newQueueSize, this.asyncMaxQueueSize, this.remoteAddr);
- stats.incAsyncQueueSizeExceeded(1);
- disconnectSlowReceiver();
- // reset buffer since we will be sending it
- buffer.position(origBufferPos);
- return false;
- }
+
+ long newQueueSize = newBytes + queuedBytes;
+ if (newQueueSize > asyncMaxQueueSize) {
+ logger.warn("Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
+ newQueueSize, asyncMaxQueueSize, remoteAddr);
+ stats.incAsyncQueueSizeExceeded(1);
+ disconnectSlowReceiver();
+ // reset buffer since we will be sending it
+ buffer.position(origBufferPos);
+ return false;
}
- this.outgoingQueue.addLast(objToQueue);
- this.queuedBytes += newBytes;
+
+ outgoingQueue.addLast(objToQueue);
+ queuedBytes += newBytes;
stats.incAsyncQueueSize(newBytes);
if (!didConflation) {
stats.incAsyncQueuedMsgs();
@@ -2260,62 +2122,57 @@ public class Connection implements Runnable {
throws ConnectionException {
if (!addToQueue(buffer, msg, true)) {
return false;
- } else {
- startMessagePusher();
- return true;
}
+ startMessagePusher();
+ return true;
}
- private final Object pusherSync = new Object();
-
private void startMessagePusher() {
- synchronized (this.pusherSync) {
- while (this.pusherThread != null) {
+ synchronized (pusherSync) {
+ while (pusherThread != null) {
// wait for previous pusher thread to exit
boolean interrupted = Thread.interrupted();
try {
- this.pusherSync.wait(); // spurious wakeup ok
+ pusherSync.wait(); // spurious wakeup ok
} catch (InterruptedException ex) {
interrupted = true;
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
- this.asyncQueuingInProgress = true;
- this.pusherThread =
- new LoggingThread("P2P async pusher to " + this.remoteAddr, this::runMessagePusher);
- } // synchronized
- this.pusherThread.start();
+ asyncQueuingInProgress = true;
+ pusherThread = new LoggingThread("P2P async pusher to " + remoteAddr, this::runMessagePusher);
+ }
+ pusherThread.start();
}
- private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
- ByteBuffer result = null;
- final DMStats stats = this.owner.getConduit().getStats();
+ private ByteBuffer takeFromOutgoingQueue() {
+ final DMStats stats = owner.getConduit().getStats();
long start = DistributionStats.getStatTime();
try {
- synchronized (this.outgoingQueue) {
- if (this.disconnectRequested) {
+ ByteBuffer result = null;
+ synchronized (outgoingQueue) {
+ if (disconnectRequested) {
// don't bother with anymore work since we are done
- this.asyncQueuingInProgress = false;
- this.outgoingQueue.notifyAll();
+ asyncQueuingInProgress = false;
+ outgoingQueue.notifyAll();
return null;
}
- // Object o = this.outgoingQueue.poll();
do {
- if (this.outgoingQueue.isEmpty()) {
+ if (outgoingQueue.isEmpty()) {
break;
}
- Object o = this.outgoingQueue.removeFirst();
+ Object o = outgoingQueue.removeFirst();
if (o == null) {
break;
}
if (o instanceof ConflationKey) {
result = ((ConflationKey) o).getBuffer();
if (result != null) {
- this.conflatedKeys.remove(o);
+ conflatedKeys.remove(o);
} else {
// if result is null then this same key will be found later in the
// queue so we just need to skip this entry
@@ -2325,13 +2182,13 @@ public class Connection implements Runnable {
result = (ByteBuffer) o;
}
int newBytes = result.remaining();
- this.queuedBytes -= newBytes;
+ queuedBytes -= newBytes;
stats.incAsyncQueueSize(-newBytes);
stats.incAsyncDequeuedMsgs();
} while (result == null);
if (result == null) {
- this.asyncQueuingInProgress = false;
- this.outgoingQueue.notifyAll();
+ asyncQueuingInProgress = false;
+ outgoingQueue.notifyAll();
}
}
return result;
@@ -2342,50 +2199,43 @@ public class Connection implements Runnable {
}
}
- private boolean disconnectRequested = false;
-
-
/**
* @since GemFire 4.2.2
*/
private void disconnectSlowReceiver() {
- synchronized (this.outgoingQueue) {
- if (this.disconnectRequested) {
+ synchronized (outgoingQueue) {
+ if (disconnectRequested) {
// only ask once
return;
}
- this.disconnectRequested = true;
+ disconnectRequested = true;
}
- DistributionManager dm = this.owner.getDM();
+ DistributionManager dm = owner.getDM();
if (dm == null) {
- this.owner.removeEndpoint(this.remoteAddr,
- "no distribution manager");
+ owner.removeEndpoint(remoteAddr, "no distribution manager");
return;
}
- dm.getDistribution().requestMemberRemoval(this.remoteAddr,
+ dm.getDistribution().requestMemberRemoval(remoteAddr,
"Disconnected as a slow-receiver");
// Ok, we sent the message, the coordinator should kick the member out
// immediately and inform this process with a new view.
- // Let's wait
- // for that to happen and if it doesn't in X seconds
- // then remove the endpoint.
- final int FORCE_TIMEOUT = 3000;
- while (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) {
+ // Let's wait for that to happen and if it doesn't in X seconds then remove the endpoint.
+ while (dm.getOtherDistributionManagerIds().contains(remoteAddr)) {
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ie);
return;
}
}
- this.owner.removeEndpoint(this.remoteAddr,
+ owner.removeEndpoint(remoteAddr,
"Force disconnect timed out");
- if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) {
+ if (dm.getOtherDistributionManagerIds().contains(remoteAddr)) {
if (logger.isDebugEnabled()) {
- logger.debug("Force disconnect timed out after waiting {} seconds", (FORCE_TIMEOUT / 1000));
+ final int FORCE_TIMEOUT = 3000;
+ logger.debug("Force disconnect timed out after waiting {} seconds", FORCE_TIMEOUT / 1000);
}
- return;
}
}
@@ -2394,7 +2244,7 @@ public class Connection implements Runnable {
*/
private void runMessagePusher() {
try {
- final DMStats stats = this.owner.getConduit().getStats();
+ final DMStats stats = owner.getConduit().getStats();
final long threadStart = stats.startAsyncThread();
try {
stats.incAsyncQueues(1);
@@ -2402,10 +2252,10 @@ public class Connection implements Runnable {
try {
int flushId = 0;
- while (this.asyncQueuingInProgress && this.connected) {
+ while (asyncQueuingInProgress && connected) {
if (SystemFailure.getFailure() != null) {
// Allocate no objects here!
- Socket s = this.socket;
+ Socket s = socket;
if (s != null) {
try {
logger.debug("closing socket", new Exception("closing socket"));
@@ -2415,19 +2265,19 @@ public class Connection implements Runnable {
// don't care
}
}
- SystemFailure.checkFailure(); // throws
+ SystemFailure.checkFailure();
}
- if (this.owner.getConduit().getCancelCriterion().isCancelInProgress()) {
+ if (owner.getConduit().getCancelCriterion().isCancelInProgress()) {
break;
}
flushId++;
long flushStart = stats.startAsyncQueueFlush();
try {
- long curQueuedBytes = this.queuedBytes;
- if (curQueuedBytes > this.asyncMaxQueueSize) {
+ long curQueuedBytes = queuedBytes;
+ if (curQueuedBytes > asyncMaxQueueSize) {
logger.warn(
"Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
- curQueuedBytes, this.asyncMaxQueueSize, this.remoteAddr);
+ curQueuedBytes, asyncMaxQueueSize, remoteAddr);
stats.incAsyncQueueSizeExceeded(1);
disconnectSlowReceiver();
return;
@@ -2441,28 +2291,21 @@ public class Connection implements Runnable {
return;
}
writeFully(channel, bb, true, null);
- // We should not add messagesSent here according to Bruce.
- // The counts are increased elsewhere.
- // messagesSent++;
+ // We should not add messagesSent. The counts are increased elsewhere.
accessed();
} finally {
stats.endAsyncQueueFlush(flushStart);
}
- } // while
+ }
} finally {
// need to force this to false before doing the requestClose calls
- synchronized (this.outgoingQueue) {
- this.asyncQueuingInProgress = false;
- this.outgoingQueue.notifyAll();
+ synchronized (outgoingQueue) {
+ asyncQueuingInProgress = false;
+ outgoingQueue.notifyAll();
}
}
- } catch (InterruptedException ex) {
- // someone wants us to stop.
- // No need to set interrupt bit, we're quitting.
- // No need to throw an error, we're quitting.
} catch (IOException ex) {
- final String err =
- String.format("P2P pusher io exception for %s", this);
+ String err = String.format("P2P pusher io exception for %s", this);
if (!isSocketClosed()) {
if (logger.isDebugEnabled() && !isIgnorableIOException(ex)) {
logger.debug(err, ex);
@@ -2472,17 +2315,15 @@ public class Connection implements Runnable {
requestClose(err + ": " + ex);
} catch (Exception ignore) {
}
- } catch (CancelException ex) { // bug 37367
- final String err = String.format("P2P pusher %s caught CacheClosedException: %s",
- this, ex);
+ } catch (CancelException ex) {
+ String err = String.format("P2P pusher %s caught CacheClosedException: %s", this, ex);
logger.debug(err);
try {
requestClose(err);
} catch (Exception ignore) {
}
- return;
} catch (Exception ex) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex); // bug 37101
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
if (!isSocketClosed()) {
logger.fatal(String.format("P2P pusher exception: %s", ex), ex);
}
@@ -2491,8 +2332,8 @@ public class Connection implements Runnable {
} catch (Exception ignore) {
}
} finally {
- stats.incAsyncQueueSize(-this.queuedBytes);
- this.queuedBytes = 0;
+ stats.incAsyncQueueSize(-queuedBytes);
+ queuedBytes = 0;
stats.endAsyncThread(threadStart);
stats.incAsyncThreads(-1);
stats.incAsyncQueues(-1);
@@ -2502,9 +2343,9 @@ public class Connection implements Runnable {
}
}
} finally {
- synchronized (this.pusherSync) {
- this.pusherThread = null;
- this.pusherSync.notify();
+ synchronized (pusherSync) {
+ pusherThread = null;
+ pusherSync.notifyAll();
}
}
}
@@ -2519,34 +2360,26 @@ public class Connection implements Runnable {
}
// only use sync writes if:
// we are already queuing
- if (this.asyncQueuingInProgress) {
+ if (asyncQueuingInProgress) {
// it will just tack this msg onto the outgoing queue
return true;
}
// or we are a receiver
- if (this.isReceiver) {
+ if (isReceiver) {
return true;
}
// or we are an unordered connection
- if (!this.preserveOrder) {
+ if (!preserveOrder) {
return true;
}
// or the receiver does not allow queuing
- if (this.asyncDistributionTimeout == 0) {
+ if (asyncDistributionTimeout == 0) {
return true;
}
// OTHERWISE return false and let caller send async
return false;
}
- /**
- * If true then act as if the socket buffer is full and start async queuing
- */
- @MutableForTesting
- public static volatile boolean FORCE_ASYNC_QUEUE = false;
-
- private static final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 2)
-
private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
DistributionMessage p_msg, final DMStats stats) throws IOException {
DistributionMessage msg = p_msg;
@@ -2556,10 +2389,10 @@ public class Connection implements Runnable {
int retries = 0;
int totalAmtWritten = 0;
try {
- synchronized (this.outLock) {
+ synchronized (outLock) {
if (!forceAsync) {
// check one more time while holding outLock in case a pusher was created
- if (this.asyncQueuingInProgress) {
+ if (asyncQueuingInProgress) {
if (addToQueue(buffer, msg, false)) {
return;
}
@@ -2569,19 +2402,19 @@ public class Connection implements Runnable {
socketWriteStarted = true;
startSocketWrite = stats.startSocketWrite(false);
long now = System.currentTimeMillis();
- int waitTime = 1;
long distributionTimeoutTarget = 0;
// if asyncDistributionTimeout == 1 then we want to start queuing
// as soon as we do a non blocking socket write that returns 0
- if (this.asyncDistributionTimeout != 1) {
- distributionTimeoutTarget = now + this.asyncDistributionTimeout;
+ if (asyncDistributionTimeout != 1) {
+ distributionTimeoutTarget = now + asyncDistributionTimeout;
}
- long queueTimeoutTarget = now + this.asyncQueueTimeout;
+ long queueTimeoutTarget = now + asyncQueueTimeout;
channel.configureBlocking(false);
try {
ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
+ int waitTime = 1;
do {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
retries++;
int amtWritten;
if (FORCE_ASYNC_QUEUE) {
@@ -2600,10 +2433,10 @@ public class Connection implements Runnable {
"Starting async pusher to handle async queue because distribution-timeout is 1 and the last socket write would have blocked.");
} else {
long blockedMs = now - distributionTimeoutTarget;
- blockedMs += this.asyncDistributionTimeout;
+ blockedMs += asyncDistributionTimeout;
logger.debug(
"Blocked for {}ms which is longer than the max of {}ms so starting async pusher to handle async queue.",
- blockedMs, this.asyncDistributionTimeout);
+ blockedMs, asyncDistributionTimeout);
}
}
stats.incAsyncDistributionTimeoutExceeded();
@@ -2621,32 +2454,30 @@ public class Connection implements Runnable {
timeoutTarget = distributionTimeoutTarget;
} else {
boolean disconnectNeeded = false;
- long curQueuedBytes = this.queuedBytes;
- if (curQueuedBytes > this.asyncMaxQueueSize) {
+ long curQueuedBytes = queuedBytes;
+ if (curQueuedBytes > asyncMaxQueueSize) {
logger.warn(
"Queued bytes {} exceeds max of {}, asking slow receiver {} to disconnect.",
- curQueuedBytes,
- this.asyncMaxQueueSize, this.remoteAddr);
+ curQueuedBytes, asyncMaxQueueSize, remoteAddr);
stats.incAsyncQueueSizeExceeded(1);
disconnectNeeded = true;
}
if (now > queueTimeoutTarget) {
- // we have waited long enough
- // the pusher has been idle too long!
+ // we have waited long enough the pusher has been idle too long!
long blockedMs = now - queueTimeoutTarget;
- blockedMs += this.asyncQueueTimeout;
+ blockedMs += asyncQueueTimeout;
logger.warn(
"Blocked for {}ms which is longer than the max of {}ms, asking slow receiver {} to disconnect.",
blockedMs,
- this.asyncQueueTimeout, this.remoteAddr);
+ asyncQueueTimeout, remoteAddr);
stats.incAsyncQueueTimeouts(1);
disconnectNeeded = true;
}
if (disconnectNeeded) {
disconnectSlowReceiver();
- synchronized (this.outgoingQueue) {
- this.asyncQueuingInProgress = false;
- this.outgoingQueue.notifyAll(); // for bug 42330
+ synchronized (outgoingQueue) {
+ asyncQueuingInProgress = false;
+ outgoingQueue.notifyAll();
}
return;
}
@@ -2669,7 +2500,7 @@ public class Connection implements Runnable {
Thread.sleep(msToWait);
} catch (InterruptedException ex) {
interrupted = true;
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
@@ -2685,7 +2516,7 @@ public class Connection implements Runnable {
else {
totalAmtWritten += amtWritten;
// reset queueTimeoutTarget since we made some progress
- queueTimeoutTarget = System.currentTimeMillis() + this.asyncQueueTimeout;
+ queueTimeoutTarget = System.currentTimeMillis() + asyncQueueTimeout;
waitTime = 1;
}
} while (wrappedBuffer.remaining() > 0);
@@ -2709,23 +2540,24 @@ public class Connection implements Runnable {
* @param forceAsync true if we need to force a blocking async write.
* @throws ConnectionException if the conduit has stopped
*/
+ @VisibleForTesting
void writeFully(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
DistributionMessage msg) throws IOException, ConnectionException {
- final DMStats stats = this.owner.getConduit().getStats();
- if (!this.sharedResource) {
+ final DMStats stats = owner.getConduit().getStats();
+ if (!sharedResource) {
stats.incTOSentMsg();
}
if (useSyncWrites(forceAsync)) {
- if (this.asyncQueuingInProgress) {
+ if (asyncQueuingInProgress) {
if (addToQueue(buffer, msg, false)) {
return;
}
// fall through
}
long startLock = stats.startSocketLock();
- synchronized (this.outLock) {
+ synchronized (outLock) {
stats.endSocketLock(startLock);
- if (this.asyncQueuingInProgress) {
+ if (asyncQueuingInProgress) {
if (addToQueue(buffer, msg, false)) {
return;
}
@@ -2742,18 +2574,20 @@ public class Connection implements Runnable {
}
}
- } // synchronized
+ }
} else {
writeAsync(channel, buffer, forceAsync, msg, stats);
}
}
- /** gets the buffer for receiving message length bytes */
+ /**
+ * gets the buffer for receiving message length bytes
+ */
private ByteBuffer getInputBuffer() {
if (inputBuffer == null) {
- int allocSize = this.recvBufferSize;
+ int allocSize = recvBufferSize;
if (allocSize == -1) {
- allocSize = this.owner.getConduit().tcpBufferSize;
+ allocSize = owner.getConduit().tcpBufferSize;
}
inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize);
}
@@ -2761,47 +2595,20 @@ public class Connection implements Runnable {
}
/**
- * stateLock is used to synchronize state changes.
- */
- private final Object stateLock = new Object();
-
- /** for timeout processing, this is the current state of the connection */
- private byte connectionState = STATE_IDLE;
-
- /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
- /** the connection is idle, but may be in use */
- private static final byte STATE_IDLE = 0;
- /** the connection is in use and is transmitting data */
- private static final byte STATE_SENDING = 1;
- /** the connection is in use and is done transmitting */
- private static final byte STATE_POST_SENDING = 2;
- /** the connection is in use and is reading a direct-ack */
- private static final byte STATE_READING_ACK = 3;
- /** the connection is in use and has finished reading a direct-ack */
- private static final byte STATE_RECEIVED_ACK = 4;
- /** the connection is in use and is reading a message */
- private static final byte STATE_READING = 5;
- /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
-
- /** set to true if we exceeded the ack-wait-threshold waiting for a response */
- private volatile boolean ackTimedOut;
-
- /**
* @throws SocketTimeoutException if wait expires.
* @throws ConnectionException if ack is not received
*/
public void readAck(final DirectReplyProcessor processor)
throws SocketTimeoutException, ConnectionException {
if (isSocketClosed()) {
- throw new ConnectionException(
- "connection is closed");
+ throw new ConnectionException("connection is closed");
}
- synchronized (this.stateLock) {
- this.connectionState = STATE_READING_ACK;
+ synchronized (stateLock) {
+ connectionState = STATE_READING_ACK;
}
- boolean origSocketInUse = this.socketInUse;
- this.socketInUse = true;
+ boolean origSocketInUse = socketInUse;
+ socketInUse = true;
MsgReader msgReader = null;
DMStats stats = owner.getConduit().getStats();
final Version version = getRemoteVersion();
@@ -2810,7 +2617,7 @@ public class Connection implements Runnable {
Header header = msgReader.readHeader();
- ReplyMessage msg = null;
+ ReplyMessage msg;
int len;
if (header.getMessageType() == NORMAL_MSG_TYPE) {
msg = (ReplyMessage) msgReader.readMessage(header);
@@ -2832,7 +2639,7 @@ public class Connection implements Runnable {
// about performance, we'll skip those checks. Skipping them
// should be legit, because we just sent a message so we know
// the member is already in our view, etc.
- ClusterDistributionManager dm = (ClusterDistributionManager) owner.getDM();
+ DistributionManager dm = owner.getDM();
msg.setBytesRead(len);
msg.setSender(remoteAddr);
stats.incReceivedMessages(1L);
@@ -2854,39 +2661,36 @@ public class Connection implements Runnable {
}
try {
requestClose(err + ": " + e);
- } catch (Exception ex) {
+ } catch (Exception ignored) {
}
- throw new ConnectionException(
- String.format("Unable to read direct ack because: %s", e));
+ throw new ConnectionException(String.format("Unable to read direct ack because: %s", e));
} catch (ConnectionException e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
throw e;
} catch (Exception e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
if (!isSocketClosed()) {
logger.fatal("ack read exception", e);
}
try {
requestClose(String.format("ack read exception: %s", e));
- } catch (Exception ex) {
+ } catch (Exception ignored) {
}
- throw new ConnectionException(
- String.format("Unable to read direct ack because: %s", e));
+ throw new ConnectionException(String.format("Unable to read direct ack because: %s", e));
} finally {
stats.incProcessedMessages(1L);
accessed();
- this.socketInUse = origSocketInUse;
- if (this.ackTimedOut) {
- logger.info("Finished waiting for reply from {}",
- getRemoteAddress());
- this.ackTimedOut = false;
+ socketInUse = origSocketInUse;
+ if (ackTimedOut) {
+ logger.info("Finished waiting for reply from {}", getRemoteAddress());
+ ackTimedOut = false;
}
if (msgReader != null) {
msgReader.close();
}
}
synchronized (stateLock) {
- this.connectionState = STATE_RECEIVED_ACK;
+ connectionState = STATE_RECEIVED_ACK;
}
}
@@ -2895,7 +2699,6 @@ public class Connection implements Runnable {
* deserialized and passed to TCPConduit for further processing
*/
private void processInputBuffer() throws ConnectionException, IOException {
-
inputBuffer.flip();
ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer);
@@ -2904,7 +2707,7 @@ public class Connection implements Runnable {
boolean done = false;
while (!done && connected) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
int remaining = peerDataBuffer.remaining();
if (lengthSet || remaining >= MSG_HEADER_BYTES) {
if (!lengthSet) {
@@ -2922,7 +2725,7 @@ public class Connection implements Runnable {
int oldLimit = peerDataBuffer.limit();
peerDataBuffer.limit(startPos + messageLength);
- if (this.handshakeRead) {
+ if (handshakeRead) {
try {
readMessage(peerDataBuffer);
} catch (SerializationException e) {
@@ -2932,16 +2735,15 @@ public class Connection implements Runnable {
} else {
ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer);
DataInputStream dis = new DataInputStream(bbis);
- if (!this.isReceiver) {
+ if (!isReceiver) {
// we read the handshake and then stop processing since we don't want
// to process the input buffer anymore in a handshake thread
readHandshakeForSender(dis, peerDataBuffer);
return;
- } else {
- if (readHandshakeForReceiver(dis)) {
- ioFilter.doneReading(peerDataBuffer);
- return;
- }
+ }
+ if (readHandshakeForReceiver(dis)) {
+ ioFilter.doneReading(peerDataBuffer);
+ return;
}
}
if (!connected) {
@@ -2965,7 +2767,7 @@ public class Connection implements Runnable {
}
}
- private boolean readHandshakeForReceiver(DataInputStream dis) {
+ private boolean readHandshakeForReceiver(DataInput dis) {
try {
byte b = dis.readByte();
if (b != 0) {
@@ -2981,29 +2783,26 @@ public class Connection implements Runnable {
"Detected wrong version of GemFire product during handshake. Expected %s but found %s",
HANDSHAKE_VERSION, handshakeByte));
}
- InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis);
- setRemoteAddr(remote);
- this.sharedResource = dis.readBoolean();
- this.preserveOrder = dis.readBoolean();
- this.uniqueId = dis.readLong();
+ remoteAddr = DSFIDFactory.readInternalDistributedMember(dis);
+ sharedResource = dis.readBoolean();
+ preserveOrder = dis.readBoolean();
+ uniqueId = dis.readLong();
// read the product version ordinal for on-the-fly serialization
// transformations (for rolling upgrades)
- this.remoteVersion = Version.readVersion(dis, true);
+ remoteVersion = Version.readVersion(dis, true);
int dominoNumber = 0;
- if (this.remoteVersion == null
- || (this.remoteVersion.compareTo(Version.GFE_80) >= 0)) {
+ if (remoteVersion == null
+ || remoteVersion.compareTo(Version.GFE_80) >= 0) {
dominoNumber = dis.readInt();
- if (this.sharedResource) {
+ if (sharedResource) {
dominoNumber = 0;
}
dominoCount.set(dominoNumber);
- // this.senderName = dis.readUTF();
}
- if (!this.sharedResource) {
+ if (!sharedResource) {
if (tipDomino()) {
- logger.info(
- "thread owned receiver forcing itself to send on thread owned sockets");
- // bug #49565 - if domino count is >= 2 use shared resources.
+ logger.info("thread owned receiver forcing itself to send on thread owned sockets");
+ // if domino count is >= 2 use shared resources.
// Also see DistributedCacheOperation#supportsDirectAck
} else { // if (dominoNumber < 2) {
ConnectionTable.threadWantsOwnResources();
@@ -3012,62 +2811,54 @@ public class Connection implements Runnable {
"thread-owned receiver with domino count of {} will prefer sending on thread-owned sockets",
dominoNumber);
}
- // } else {
- // ConnectionTable.threadWantsSharedResources();
}
- this.conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
- // Because this thread is not shared resource, it will be used for direct
- // ack. Direct ack messages can be large. This call will resize the send
- // buffer.
- setSendBufferSize(this.socket);
+ conduit.getStats().incThreadOwnedReceivers(1L, dominoNumber);
+ // Because this thread is not shared resource, it will be used for direct ack.
+ // Direct ack messages can be large. This call will resize the send buffer.
+ setSendBufferSize(socket);
}
- // String name = owner.getDM().getConfig().getName();
- // if (name == null) {
- // name = "pid="+OSProcess.getId();
- // }
setThreadName(dominoNumber);
} catch (Exception e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e); // bug 37101
logger.fatal("Error deserializing P2P handshake message", e);
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
requestClose("Error deserializing P2P handshake message");
return true;
}
if (logger.isDebugEnabled()) {
- logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr,
- (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : ""));
+ logger.debug("P2P handshake remoteAddr is {}{}", remoteAddr,
+ remoteVersion != null ? " (" + remoteVersion + ')' : "");
}
try {
- String authInit = System.getProperty(
- DistributionConfigImpl.SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
- boolean isSecure = authInit != null && authInit.length() != 0;
+ String authInit = System.getProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT);
+ boolean isSecure = authInit != null && !authInit.isEmpty();
if (isSecure) {
- if (owner.getConduit().waitForMembershipCheck(this.remoteAddr)) {
- sendOKHandshakeReply(); // fix for bug 33224
+ if (owner.getConduit().waitForMembershipCheck(remoteAddr)) {
+ sendOKHandshakeReply();
notifyHandshakeWaiter(true);
} else {
- // ARB: check if we need notifyHandshakeWaiter() call.
+ // check if we need notifyHandshakeWaiter() call.
notifyHandshakeWaiter(false);
logger.warn("{} timed out during a membership check.",
p2pReaderName());
return true;
}
} else {
- sendOKHandshakeReply(); // fix for bug 33224
+ sendOKHandshakeReply();
try {
notifyHandshakeWaiter(true);
} catch (Exception e) {
logger.fatal("Uncaught exception from listener", e);
}
}
- this.finishedConnecting = true;
+ finishedConnecting = true;
} catch (IOException ex) {
final String err = "Failed sending handshake reply";
if (logger.isDebugEnabled()) {
logger.debug(err, ex);
}
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
requestClose(err + ": " + ex);
return true;
}
@@ -3084,13 +2875,13 @@ public class Connection implements Runnable {
messageId = peerDataBuffer.getShort();
directAck = (messageType & DIRECT_ACK_BIT) != 0;
if (directAck) {
- messageType &= ~DIRECT_ACK_BIT; // clear the ack bit
+ // clear the ack bit
+ messageType &= ~DIRECT_ACK_BIT;
}
- // Following validation fixes bug 31145
if (!validMsgType(messageType)) {
Integer nioMessageTypeInteger = (int) messageType;
logger.fatal("Unknown P2P message type: {}", nioMessageTypeInteger);
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
requestClose(String.format("Unknown P2P message type: %s",
nioMessageTypeInteger));
return true;
@@ -3104,16 +2895,16 @@ public class Connection implements Runnable {
private void readMessage(ByteBuffer peerDataBuffer) {
if (messageType == NORMAL_MSG_TYPE) {
- this.owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
+ owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength);
ByteBufferInputStream bbis =
remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer)
: new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion);
- DistributionMessage msg;
try {
ReplyProcessor21.initMessageRPId();
// add serialization stats
- long startSer = this.owner.getConduit().getStats().startMsgDeserialization();
+ long startSer = owner.getConduit().getStats().startMsgDeserialization();
int startingPosition = peerDataBuffer.position();
+ DistributionMessage msg;
try {
msg = (DistributionMessage) InternalDataSerializer.readDSFID(bbis);
} catch (SerializationException e) {
@@ -3123,19 +2914,19 @@ public class Connection implements Runnable {
peerDataBuffer.capacity(), messageLength);
throw e;
}
- this.owner.getConduit().getStats().endMsgDeserialization(startSer);
+ owner.getConduit().getStats().endMsgDeserialization(startSer);
if (bbis.available() != 0) {
- logger.warn("Message deserialization of {} did not read {} bytes.",
- msg, bbis.available());
+ logger.warn("Message deserialization of {} did not read {} bytes.", msg,
+ bbis.available());
}
try {
if (!dispatchMessage(msg, messageLength, directAck)) {
directAck = false;
}
} catch (MemberShunnedException e) {
- directAck = false; // don't respond (bug39117)
+ directAck = false; // don't respond
} catch (Exception de) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
logger.fatal("Error dispatching message", de);
} catch (ThreadDeath td) {
throw td;
@@ -3166,8 +2957,7 @@ public class Connection implements Runnable {
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
- sendFailureReply(ReplyProcessor21.getMessageRPId(),
- "Error deserializing message", t,
+ sendFailureReply(ReplyProcessor21.getMessageRPId(), "Error deserializing message", t,
directAck);
if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
@@ -3184,15 +2974,16 @@ public class Connection implements Runnable {
}
} else if (messageType == CHUNKED_MSG_TYPE) {
MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
- this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+ owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
messageLength);
try {
md.addChunk(peerDataBuffer, messageLength);
} catch (IOException ex) {
+ // ignored
}
} else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
MsgDestreamer md = obtainMsgDestreamer(messageId, remoteVersion);
- this.owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
+ owner.getConduit().getStats().incMessagesBeingReceived(md.size() == 0,
messageLength);
try {
md.addChunk(peerDataBuffer, messageLength);
@@ -3208,20 +2999,20 @@ public class Connection implements Runnable {
try {
msg = md.getMessage();
} catch (ClassNotFoundException ex) {
- this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+ owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = "ClassNotFound deserializing message";
failureEx = ex;
rpId = md.getRPid();
logger.fatal("ClassNotFound deserializing message: {}", ex.toString());
} catch (IOException ex) {
- this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+ owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = "IOException deserializing message";
failureEx = ex;
rpId = md.getRPid();
logger.fatal("IOException deserializing message", failureEx);
} catch (InterruptedException ex) {
interrupted = true;
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
@@ -3234,13 +3025,12 @@ public class Connection implements Runnable {
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
- this.owner.getConduit().getStats().decMessagesBeingReceived(md.size());
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(ex);
+ owner.getConduit().getStats().decMessagesBeingReceived(md.size());
failureMsg = "Unexpected failure deserializing message";
failureEx = ex;
rpId = md.getRPid();
- logger.fatal("Unexpected failure deserializing message",
- failureEx);
+ logger.fatal("Unexpected failure deserializing message", failureEx);
} finally {
msgLength = md.size();
releaseMsgDestreamer(messageId, md);
@@ -3257,7 +3047,7 @@ public class Connection implements Runnable {
// not a member anymore - don't reply
directAck = false;
} catch (Exception de) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(de);
logger.fatal("Error dispatching message", de);
} catch (ThreadDeath td) {
throw td;
@@ -3283,49 +3073,47 @@ public class Connection implements Runnable {
void readHandshakeForSender(DataInputStream dis, ByteBuffer peerDataBuffer) {
try {
- this.replyCode = dis.readUnsignedByte();
+ int replyCode = dis.readUnsignedByte();
switch (replyCode) {
case REPLY_CODE_OK:
ioFilter.doneReading(peerDataBuffer);
notifyHandshakeWaiter(true);
return;
case REPLY_CODE_OK_WITH_ASYNC_INFO:
- this.asyncDistributionTimeout = dis.readInt();
- this.asyncQueueTimeout = dis.readInt();
- this.asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
- if (this.asyncDistributionTimeout != 0) {
+ asyncDistributionTimeout = dis.readInt();
+ asyncQueueTimeout = dis.readInt();
+ asyncMaxQueueSize = (long) dis.readInt() * (1024 * 1024);
+ if (asyncDistributionTimeout != 0) {
logger.info("{} async configuration received {}.",
p2pReaderName(),
- " asyncDistributionTimeout=" + this.asyncDistributionTimeout
- + " asyncQueueTimeout=" + this.asyncQueueTimeout
+ " asyncDistributionTimeout=" + asyncDistributionTimeout
+ + " asyncQueueTimeout=" + asyncQueueTimeout
+ " asyncMaxQueueSize="
- + (this.asyncMaxQueueSize / (1024 * 1024)));
+ + asyncMaxQueueSize / (1024 * 1024));
}
// read the product version ordinal for on-the-fly serialization
// transformations (for rolling upgrades)
- this.remoteVersion = Version.readVersion(dis, true);
+ remoteVersion = Version.readVersion(dis, true);
ioFilter.doneReading(peerDataBuffer);
notifyHandshakeWaiter(true);
return;
default:
String err =
- String.format("Unknown handshake reply code: %s messageLength: %s", this.replyCode,
- this.messageLength);
- if (replyCode == 0 && logger.isDebugEnabled()) { // bug 37113
+ String.format("Unknown handshake reply code: %s messageLength: %s", replyCode,
+ messageLength);
+ if (replyCode == 0 && logger.isDebugEnabled()) {
logger.debug(err + " (peer probably departed ungracefully)");
} else {
logger.fatal(err);
}
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
requestClose(err);
- return;
}
} catch (Exception e) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(e);
logger.fatal("Error deserializing P2P handshake reply", e);
- this.readerShuttingDown = true;
+ readerShuttingDown = true;
requestClose("Error deserializing P2P handshake reply");
- return;
} catch (ThreadDeath td) {
throw td;
} catch (VirtualMachineError err) {
@@ -3340,24 +3128,21 @@ public class Connection implements Runnable {
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
- logger.fatal("Throwable deserializing P2P handshake reply",
- t);
- this.readerShuttingDown = true;
+ logger.fatal("Throwable deserializing P2P handshake reply", t);
+ readerShuttingDown = true;
requestClose("Throwable deserializing P2P handshake reply");
- return;
}
}
private void setThreadName(int dominoNumber) {
- Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + this.remoteAddr + " "
- + (this.sharedResource ? "" : "un") + "shared" + " " + (this.preserveOrder ? "" : "un")
- + "ordered" + " uid=" + this.uniqueId + (dominoNumber > 0 ? (" dom #" + dominoNumber) : "")
- + " port=" + this.socket.getPort());
+ Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " "
+ + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un")
+ + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "")
+ + " port=" + socket.getPort());
}
private void compactOrResizeBuffer(int messageLength) {
final int oldBufferSize = inputBuffer.capacity();
- final MembershipStatistics stats = this.owner.getConduit().getStats();
int allocSize = messageLength + MSG_HEADER_BYTES;
if (oldBufferSize < allocSize) {
// need a bigger buffer
@@ -3390,7 +3175,7 @@ public class Connection implements Runnable {
"We were asked to send a direct reply on a shared socket");
msg.setReplySender(new DirectReplySender(this));
}
- this.owner.getConduit().messageReceived(this, msg, bytesRead);
+ owner.getConduit().messageReceived(this, msg, bytesRead);
return true;
} finally {
if (msg.containsRegionContentChange()) {
@@ -3399,60 +3184,52 @@ public class Connection implements Runnable {
}
}
- protected TCPConduit getConduit() {
- return this.conduit;
+ TCPConduit getConduit() {
+ return conduit;
}
protected Socket getSocket() throws SocketException {
- // fix for bug 37286
- Socket result = this.socket;
+ Socket result = socket;
if (result == null) {
- throw new SocketException(
- "socket has been closed");
+ throw new SocketException("socket has been closed");
}
return result;
}
boolean isSocketClosed() {
- return this.socket.isClosed() || !this.socket.isConnected();
+ return socket.isClosed() || !socket.isConnected();
}
boolean isReceiverStopped() {
- return this.stopped;
+ return stopped;
}
private boolean isSocketInUse() {
- return this.socketInUse;
+ return socketInUse;
}
-
protected void accessed() {
- this.accessed = true;
+ accessed = true;
}
/**
* return the DM id of the member on the other side of this connection.
*/
public InternalDistributedMember getRemoteAddress() {
- return this.remoteAddr;
+ return remoteAddr;
}
/**
* Return the version of the member on the other side of this connection.
*/
Version getRemoteVersion() {
- return this.remoteVersion;
+ return remoteVersion;
}
@Override
public String toString() {
- return String.valueOf(remoteAddr) + '@' + this.uniqueId
- + (this.remoteVersion != null ? ('(' + this.remoteVersion.toString() + ')')
- : "") /*
- * DEBUG + " accepted=" + this.isReceiver + " connected=" + this.connected +
- * " hash=" + System.identityHashCode(this) + " preserveOrder=" +
- * this.preserveOrder + " closing=" + isClosing() + ">"
- */;
+ return String.valueOf(remoteAddr) + '@' + uniqueId
+ + (remoteVersion != null ? '(' + remoteVersion.toString() + ')' : "");
}
/**
@@ -3462,7 +3239,7 @@ public class Connection implements Runnable {
* @since GemFire 5.1
*/
boolean getOriginatedHere() {
- return !this.isReceiver;
+ return !isReceiver;
}
/**
@@ -3476,7 +3253,7 @@ public class Connection implements Runnable {
* answers the unique ID of this connection in the originating VM
*/
protected long getUniqueId() {
- return this.uniqueId;
+ return uniqueId;
}
/**
@@ -3494,9 +3271,8 @@ public class Connection implements Runnable {
}
public void acquireSendPermission() throws ConnectionException {
- if (!this.connected) {
- throw new ConnectionException(
- "connection is closed");
+ if (!connected) {
+ throw new ConnectionException("connection is closed");
}
if (isReaderThread()) {
// reader threads send replies and we always want to permit those without waiting
@@ -3505,24 +3281,23 @@ public class Connection implements Runnable {
boolean interrupted = false;
try {
for (;;) {
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
try {
- this.senderSem.acquire();
+ senderSem.acquire();
break;
} catch (InterruptedException ex) {
interrupted = true;
}
- } // for
+ }
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
- if (!this.connected) {
- this.senderSem.release();
- this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); // bug 37101
- throw new ConnectionException(
- "connection is closed");
+ if (!connected) {
+ senderSem.release();
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ throw new ConnectionException("connection is closed");
}
}
@@ -3530,7 +3305,7 @@ public class Connection implements Runnable {
if (isReaderThread()) {
return;
}
- this.senderSem.release();
+ senderSem.release();
}
private void closeSenderSem() {
@@ -3542,4 +3317,105 @@ public class Connection implements Runnable {
releaseSendPermission();
}
+ private class BatchBufferFlusher extends Thread {
+
+ private volatile boolean flushNeeded;
+ private volatile boolean timeToStop;
+ private final DMStats stats;
+
+ BatchBufferFlusher() {
+ setDaemon(true);
+ stats = owner.getConduit().getStats();
+ }
+
+ /**
+ * Called when a message writer needs the current fillBatchBuffer flushed
+ */
+ void flushBuffer(ByteBuffer bb) {
+ final long start = DistributionStats.getStatTime();
+ try {
+ synchronized (this) {
+ synchronized (batchLock) {
+ if (bb != fillBatchBuffer) {
+ // it must have already been flushed. So just return and use the new fillBatchBuffer
+ return;
+ }
+ }
+ flushNeeded = true;
+ notifyAll();
+ }
+ synchronized (batchLock) {
+ // Wait for the flusher thread
+ while (bb == fillBatchBuffer) {
+ owner.getConduit().getCancelCriterion().checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ batchLock.wait(); // spurious wakeup ok
+ } catch (InterruptedException ex) {
+ interrupted = true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ } finally {
+ owner.getConduit().getStats().incBatchWaitTime(start);
+ }
+ }
+
+ public void close() {
+ synchronized (this) {
+ timeToStop = true;
+ flushNeeded = true;
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ synchronized (this) {
+ while (!timeToStop) {
+ if (!flushNeeded && fillBatchBuffer.position() <= BATCH_BUFFER_SIZE / 2) {
+ wait(BATCH_FLUSH_MS); // spurious wakeup ok
+ }
+ if (flushNeeded || fillBatchBuffer.position() > BATCH_BUFFER_SIZE / 2) {
+ final long start = DistributionStats.getStatTime();
+ synchronized (batchLock) {
+ // This is the only block of code that will swap the buffer references
+ flushNeeded = false;
+ ByteBuffer tmp = fillBatchBuffer;
+ fillBatchBuffer = sendBatchBuffer;
+ sendBatchBuffer = tmp;
+ batchLock.notifyAll();
+ }
+ // We now own the sendBatchBuffer
+ if (sendBatchBuffer.position() > 0) {
+ final boolean origSocketInUse = socketInUse;
+ socketInUse = true;
+ try {
+ sendBatchBuffer.flip();
+ SocketChannel channel = getSocket().getChannel();
+ writeFully(channel, sendBatchBuffer, false, null);
+ sendBatchBuffer.clear();
+ } catch (IOException | ConnectionException ex) {
+ logger.fatal("Exception flushing batch send buffer: %s", ex);
+ readerShuttingDown = true;
+ requestClose(String.format("Exception flushing batch send buffer: %s", ex));
+ } finally {
+ accessed();
+ socketInUse = origSocketInUse;
+ }
+ }
+ stats.incBatchFlushTime(start);
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ // time for this thread to shutdown
+ }
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index a033d5f..309f9fd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -14,18 +14,19 @@
*/
package org.apache.geode.internal.tcp;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@@ -39,11 +40,10 @@ import org.apache.geode.alerting.internal.spi.AlertingAction;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
-import org.apache.geode.distributed.internal.Distribution;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.gms.GMSMembership;
+import org.apache.geode.distributed.internal.membership.gms.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.gms.api.Membership;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.SystemTimer;
@@ -54,50 +54,45 @@ import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
- * <p>
* ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe
* between two endpoints represented by generic DistributedMembers.
- * </p>
*
* @since GemFire 2.1
*/
public class ConnectionTable {
private static final Logger logger = LogService.getLogger();
- /** warning when descriptor limit reached */
+ /**
+ * warning when descriptor limit reached
+ */
@MakeNotStatic
private static boolean ulimitWarningIssued;
/**
* true if the current thread wants non-shared resources
*/
- @MakeNotStatic
- private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
+ private static final ThreadLocal<Boolean> threadWantsOwnResources = new ThreadLocal<>();
/**
* Used for messages whose order must be preserved Only connections used for sending messages, and
* receiving acks, will be put in this map.
*/
- protected final Map orderedConnectionMap = new ConcurrentHashMap();
+ private final Map orderedConnectionMap = new ConcurrentHashMap();
/**
* ordered connections local to this thread. Note that accesses to the resulting map must be
* synchronized because of static cleanup.
*/
- // ThreadLocal<Map>
- protected final ThreadLocal<Map> threadOrderedConnMap;
+ static final ThreadLocal<Map> threadOrderedConnMap = new ThreadLocal<>();
/**
- * List of thread-owned ordered connection maps, for cleanup
- *
- * Accesses to the maps in this list need to be synchronized on their instance.
+ * List of thread-owned ordered connection maps, for cleanup. Accesses to the maps in this list
+ * need to be synchronized on their instance.
*/
private final List threadConnMaps;
/**
- * Timer to kill idle threads
- *
- * guarded.By this
+ * Timer to kill idle threads. Guarded by this.
*/
private SystemTimer idleConnTimer;
@@ -112,13 +107,11 @@ public class ConnectionTable {
* Used for all non-ordered messages. Only connections used for sending messages, and receiving
* acks, will be put in this map.
*/
- protected final Map unorderedConnectionMap = new ConcurrentHashMap();
+ private final Map unorderedConnectionMap = new ConcurrentHashMap();
/**
* Used for all accepted connections. These connections are read only; we never send messages,
- * except for acks; only receive.
- *
- * Consists of a list of Connection
+ * except for acks; only receive. Consists of a list of Connection.
*/
private final List receivers = new ArrayList();
@@ -132,7 +125,7 @@ public class ConnectionTable {
/**
* true if this table is no longer in use
*/
- private volatile boolean closed = false;
+ private volatile boolean closed;
/**
* Executor used by p2p reader and p2p handshaker threads.
@@ -143,13 +136,14 @@ public class ConnectionTable {
* minutes).
*/
private static final long READER_POOL_KEEP_ALIVE_TIME =
- Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();
+ Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120);
private final SocketCloser socketCloser;
/**
* The most recent instance to be created
*
+ * <p>
* TODO this assumes no more than one instance is created at a time?
*/
@MakeNotStatic
@@ -158,7 +152,7 @@ public class ConnectionTable {
/**
* A set of sockets that are in the process of being connected
*/
- private Map connectingSockets = new HashMap();
+ private final Map connectingSockets = new HashMap();
/**
* Cause calling thread to share communication resources with other threads.
@@ -178,7 +172,7 @@ public class ConnectionTable {
/**
* Returns true if calling thread owns its own communication resources.
*/
- boolean threadOwnsResources() {
+ private boolean threadOwnsResources() {
DistributionManager d = getDM();
if (d != null) {
return d.getSystem().threadOwnsResources() && !AlertingAction.isThreadAlerting();
@@ -187,95 +181,87 @@ public class ConnectionTable {
}
public static Boolean getThreadOwnsResourcesRegistration() {
- return (Boolean) threadWantsOwnResources.get();
+ return threadWantsOwnResources.get();
}
public TCPConduit getOwner() {
return owner;
}
+ public static ConnectionTable create(TCPConduit conduit) {
+ ConnectionTable ct = new ConnectionTable(conduit);
+ lastInstance.set(ct);
+ return ct;
+ }
- private ConnectionTable(TCPConduit conduit) throws IOException {
- this.owner = conduit;
- this.idleConnTimer = (this.owner.idleConnectionTimeout != 0)
+ private ConnectionTable(TCPConduit conduit) {
+ owner = conduit;
+ idleConnTimer = owner.idleConnectionTimeout != 0
? new SystemTimer(conduit.getDM().getSystem(), true) : null;
- this.threadOrderedConnMap = new ThreadLocal();
- this.threadConnMaps = new ArrayList();
- this.threadConnectionMap = new ConcurrentHashMap();
- this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
- this.socketCloser = new SocketCloser();
- this.bufferPool = new BufferPool(owner.getStats());
+ threadConnMaps = new ArrayList();
+ threadConnectionMap = new ConcurrentHashMap();
+ p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
+ socketCloser = new SocketCloser();
+ bufferPool = new BufferPool(owner.getStats());
}
private Executor createThreadPoolForIO(boolean conserveSockets) {
if (conserveSockets) {
return LoggingExecutors.newThreadOnEachExecute("SharedP2PReader");
- } else {
- return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("UnsharedP2PReader", 1,
- Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME);
}
+ return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("UnsharedP2PReader", 1,
+ Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME);
}
/** conduit calls acceptConnection after an accept */
- protected void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
- throws IOException, ConnectionException, InterruptedException {
- InetAddress connAddress = sock.getInetAddress(); // for bug 44736
+ void acceptConnection(Socket sock, PeerConnectionFactory peerConnectionFactory)
+ throws IOException, ConnectionException {
+ InetAddress connAddress = sock.getInetAddress();
boolean finishedConnecting = false;
Connection connection = null;
- // boolean exceptionLogged = false;
try {
connection = peerConnectionFactory.createReceiver(this, sock);
// check for shutdown (so it doesn't get missed in the finally block)
- this.owner.getCancelCriterion().checkCancelInProgress(null);
+ owner.getCancelCriterion().checkCancelInProgress(null);
finishedConnecting = true;
- } catch (IOException ex) {
- // check for shutdown...
- this.owner.getCancelCriterion().checkCancelInProgress(ex);
- logger.warn(String.format("Failed to accept connection from %s because: %s",
- new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
- throw ex;
- } catch (ConnectionException ex) {
+ } catch (ConnectionException | IOException ex) {
// check for shutdown...
- this.owner.getCancelCriterion().checkCancelInProgress(ex);
- logger.warn(String.format("Failed to accept connection from %s because: %s",
- new Object[] {(connAddress != null ? connAddress : "unavailable address"), ex}));
+ owner.getCancelCriterion().checkCancelInProgress(ex);
+ logger.warn("Failed to accept connection from {} because: {}",
+ connAddress != null ? connAddress : "unavailable address", ex);
throw ex;
} finally {
- // note: no need to call incFailedAccept here because it will be done
- // in our caller.
+ // note: no need to call incFailedAccept here because it will be done in our caller.
// no need to log error here since caller will log warning
if (connection != null && !finishedConnecting) {
// we must be throwing from checkCancelInProgress so close the connection
- closeCon("cancel after accept",
- connection);
+ closeCon("cancel after accept", connection);
connection = null;
}
}
if (connection != null) {
- synchronized (this.receivers) {
- this.owner.getStats().incReceivers();
- if (this.closed) {
+ synchronized (receivers) {
+ owner.getStats().incReceivers();
+ if (closed) {
closeCon("Connection table no longer in use", connection);
return;
}
// If connection.stopped is false, any connection cleanup thread will not yet have acquired
// the receiver synchronization to remove the receiver. Therefore we can safely add it here.
if (!(connection.isSocketClosed() || connection.isReceiverStopped())) {
- this.receivers.add(connection);
+ receivers.add(connection);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Accepted {} myAddr={} theirAddr={}", connection, getConduit().getMemberId(),
- connection.remoteAddr);
+ connection.getRemoteAddress());
}
}
}
-
-
/**
* Process a newly created PendingConnection
*
@@ -298,11 +284,11 @@ public class ConnectionTable {
try {
con = Connection.createSender(owner.getMembership(), this, preserveOrder, id,
sharedResource, startTime, ackThreshold, ackSAThreshold);
- this.owner.getStats().incSenders(sharedResource, preserveOrder);
+ owner.getStats().incSenders(sharedResource, preserveOrder);
} finally {
// our connection failed to notify anyone waiting for our pending con
if (con == null) {
- this.owner.getStats().incFailedConnect();
+ owner.getStats().incFailedConnect();
synchronized (m) {
Object rmObj = m.remove(id);
if (rmObj != pc && rmObj != null) {
@@ -313,10 +299,9 @@ public class ConnectionTable {
pc.notifyWaiters(null);
// we must be throwing an exception
}
- } // finally
+ }
- // Update our list of connections -- either the
- // orderedConnectionMap or unorderedConnectionMap
+ // Update our list of connections -- either the orderedConnectionMap or unorderedConnectionMap
//
// Note that we added the entry _before_ we attempted the connect,
// so it's possible something else got through in the mean time...
@@ -325,30 +310,21 @@ public class ConnectionTable {
if (e == pc) {
m.put(id, con);
} else if (e == null) {
- // someone closed our pending connection
- // so cleanup the connection we created
- con.requestClose(
- "pending connection cancelled");
+ // someone closed our pending connection so cleanup the connection we created
+ con.requestClose("pending connection cancelled");
con = null;
} else {
if (e instanceof Connection) {
Connection newCon = (Connection) e;
if (!newCon.connected) {
- // Fix for bug 31590
- // someone closed our pending connect
- // so cleanup the connection we created
+ // someone closed our pending connect so cleanup the connection we created
if (con != null) {
- con.requestClose(
- "pending connection closed");
+ con.requestClose("pending connection closed");
con = null;
}
} else {
- // This should not happen. It means that someone else
- // created the connection which should only happen if
- // our Connection was rejected.
- // Assert.assertTrue(false);
- // The above assertion was commented out to try the
- // following with bug 32680
+ // This should not happen. It means that someone else created the connection which
+ // should only happen if our Connection was rejected.
if (con != null) {
con.requestClose("someone else created the connection");
}
@@ -360,7 +336,7 @@ public class ConnectionTable {
pc.notifyWaiters(con);
if (con != null && logger.isDebugEnabled()) {
logger.debug("handleNewPendingConnection {} myAddr={} theirAddr={}", con,
- getConduit().getMemberId(), con.remoteAddr);
+ getConduit().getMemberId(), con.getRemoteAddress());
}
return con;
@@ -381,17 +357,19 @@ public class ConnectionTable {
private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
- Connection result = null;
- final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
+ final Map m = preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
- PendingConnection pc = null; // new connection, if needed
- Object mEntry = null; // existing connection (if we don't create a new one)
+ // new connection, if needed
+ PendingConnection pc = null;
+
+ // existing connection (if we don't create a new one)
+ Object mEntry;
// Look for pending connection
synchronized (m) {
mEntry = m.get(id);
- if (mEntry != null && (mEntry instanceof Connection)) {
+ if (mEntry instanceof Connection) {
Connection existingCon = (Connection) mEntry;
if (!existingCon.connected) {
mEntry = null;
@@ -401,17 +379,19 @@ public class ConnectionTable {
pc = new PendingConnection(preserveOrder, id);
m.put(id, pc);
}
- } // synchronized
+ }
+ Connection result;
if (pc != null) {
if (logger.isDebugEnabled()) {
logger.debug("created PendingConnection {}", pc);
}
- result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
+ result = handleNewPendingConnection(id, true, preserveOrder, m, pc,
startTime, ackTimeout, ackSATimeout);
if (!preserveOrder && scheduleTimeout) {
scheduleIdleTimeout(result);
}
+
} else { // we have existing connection
if (mEntry instanceof PendingConnection) {
@@ -420,12 +400,12 @@ public class ConnectionTable {
throw new IOException("Cannot form connection to alert listener " + id);
}
- result = ((PendingConnection) mEntry).waitForConnect(this.owner.getMembership(),
+ result = ((PendingConnection) mEntry).waitForConnect(owner.getMembership(),
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
if (result != null) {
logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
- getConduit().getMemberId(), result.remoteAddr);
+ getConduit().getMemberId(), result.getRemoteAddress());
} else {
logger.debug("getSharedConnection: Connect failed");
}
@@ -448,31 +428,30 @@ public class ConnectionTable {
* @return the connection, or null if an error
* @throws IOException if the connection could not be created
*/
- Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
+ private Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
// Look for result in the thread local
- Map m = (Map) this.threadOrderedConnMap.get();
+ Map m = threadOrderedConnMap.get();
if (m == null) {
// First time for this thread. Create thread local
m = new HashMap();
- synchronized (this.threadConnMaps) {
- if (this.closed) {
+ synchronized (threadConnMaps) {
+ if (closed) {
owner.getCancelCriterion().checkCancelInProgress(null);
- throw new DistributedSystemDisconnectedException(
- "Connection table is closed");
+ throw new DistributedSystemDisconnectedException("Connection table is closed");
}
// check for stale references and remove them.
- for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
+ for (Iterator it = threadConnMaps.iterator(); it.hasNext();) {
Reference r = (Reference) it.next();
if (r.get() == null) {
it.remove();
}
- } // for
- this.threadConnMaps.add(new WeakReference(m)); // ref added for bug 38011
- } // synchronized
- this.threadOrderedConnMap.set(m);
+ }
+ threadConnMaps.add(new WeakReference(m));
+ }
+ threadOrderedConnMap.set(m);
} else {
// Consult thread local.
synchronized (m) {
@@ -486,24 +465,22 @@ public class ConnectionTable {
return result;
// OK, we have to create a new connection.
- result = Connection.createSender(owner.getMembership(), this, true /* preserveOrder */,
- id, false /* shared */, startTime, ackTimeout, ackSATimeout);
+ result = Connection.createSender(owner.getMembership(), this, true, id, false, startTime,
+ ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}", result);
}
- this.owner.getStats().incSenders(false/* shared */, true /* preserveOrder */);
+ owner.getStats().incSenders(false, true);
// Update the list of connections owned by this thread....
- if (this.threadConnectionMap == null) {
+ if (threadConnectionMap == null) {
// This instance is being destroyed; fail the operation
- closeCon(
- "Connection table being destroyed",
- result);
+ closeCon("Connection table being destroyed", result);
return null;
}
- ArrayList al = (ArrayList) this.threadConnectionMap.get(id);
+ ArrayList al = (ArrayList) threadConnectionMap.get(id);
if (al == null) {
// First connection for this DistributedMember. Make sure list for this
// stub is created if it isn't already there.
@@ -511,7 +488,7 @@ public class ConnectionTable {
// Since it's a concurrent map, we just try to put it and then
// return whichever we got.
- Object o = this.threadConnectionMap.putIfAbsent(id, al);
+ Object o = threadConnectionMap.putIfAbsent(id, al);
if (o != null) {
al = (ArrayList) o;
}
@@ -534,37 +511,34 @@ public class ConnectionTable {
/** schedule an idle-connection timeout task */
private void scheduleIdleTimeout(Connection conn) {
if (conn == null) {
- // fix for bug 43529
return;
}
// Set the idle timeout
- if (this.owner.idleConnectionTimeout != 0) {
+ if (owner.idleConnectionTimeout != 0) {
try {
synchronized (this) {
- if (!this.closed) {
+ if (!closed) {
IdleConnTT task = new IdleConnTT(conn);
conn.setIdleTimeoutTask(task);
- this.getIdleConnTimer().scheduleAtFixedRate(task, this.owner.idleConnectionTimeout,
- this.owner.idleConnectionTimeout);
+ getIdleConnTimer().scheduleAtFixedRate(task, owner.idleConnectionTimeout,
+ owner.idleConnectionTimeout);
}
}
} catch (IllegalStateException e) {
if (conn.isClosing()) {
- // bug #45077 - connection is closed before we schedule the timeout task,
+ // connection is closed before we schedule the timeout task,
// causing the task to be canceled
return;
}
logger.debug("Got an illegal state exception: {}", e.getMessage(), e);
- // Unfortunately, cancelInProgress() is not set until *after*
- // the shutdown message has been sent, so we need to check the
- // "closeInProgress" bit instead.
+ // Unfortunately, cancelInProgress() is not set until *after* the shutdown message has been
+ // sent, so we need to check the "closeInProgress" bit instead.
owner.getCancelCriterion().checkCancelInProgress(null);
Throwable cause = owner.getShutdownCause();
if (cause == null) {
cause = e;
}
- throw new DistributedSystemDisconnectedException(
- "The distributed system is shutting down",
+ throw new DistributedSystemDisconnectedException("The distributed system is shutting down",
cause);
}
}
@@ -579,17 +553,16 @@ public class ConnectionTable {
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new Connection, or null if a problem
- * @throws java.io.IOException if the connection could not be created
+ * @throws IOException if the connection could not be created
*/
protected Connection get(DistributedMember id, boolean preserveOrder, long startTime,
long ackTimeout, long ackSATimeout)
- throws java.io.IOException, DistributedSystemDisconnectedException {
- if (this.closed) {
- this.owner.getCancelCriterion().checkCancelInProgress(null);
- throw new DistributedSystemDisconnectedException(
- "Connection table is closed");
+ throws IOException, DistributedSystemDisconnectedException {
+ if (closed) {
+ owner.getCancelCriterion().checkCancelInProgress(null);
+ throw new DistributedSystemDisconnectedException("Connection table is closed");
}
- Connection result = null;
+ Connection result;
boolean threadOwnsResources = threadOwnsResources();
if (!preserveOrder || !threadOwnsResources) {
result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
@@ -603,26 +576,25 @@ public class ConnectionTable {
return result;
}
- protected synchronized void fileDescriptorsExhausted() {
+ synchronized void fileDescriptorsExhausted() {
if (!ulimitWarningIssued) {
ulimitWarningIssued = true;
logger.fatal(
"This process is out of file descriptors.This will hamper communications and slow down the system.Any conserve-sockets setting is now being ignored.Please consider raising the descriptor limit.This alert is only issued once per process.");
InternalDistributedSystem.getAnyInstance().setShareSockets(true);
- threadWantsOwnResources = new ThreadLocal();
}
}
- protected TCPConduit getConduit() {
+ TCPConduit getConduit() {
return owner;
}
- public BufferPool getBufferPool() {
+ BufferPool getBufferPool() {
return bufferPool;
}
public boolean isClosed() {
- return this.closed;
+ return closed;
}
private static void closeCon(String reason, Object c) {
@@ -634,7 +606,7 @@ public class ConnectionTable {
return;
}
if (c instanceof Connection) {
- ((Connection) c).closePartialConnect(reason, beingSick); // fix for bug 31666
+ ((Connection) c).closePartialConnect(reason, beingSick);
} else {
((PendingConnection) c).notifyWaiters(null);
}
@@ -644,85 +616,76 @@ public class ConnectionTable {
* returns the idle connection timer, or null if the connection table is closed. guarded by a sync
* on the connection table
*/
- protected synchronized SystemTimer getIdleConnTimer() {
- if (this.closed) {
+ synchronized SystemTimer getIdleConnTimer() {
+ if (closed) {
return null;
}
- if (this.idleConnTimer == null) {
- this.idleConnTimer = new SystemTimer(getDM().getSystem(), true);
+ if (idleConnTimer == null) {
+ idleConnTimer = new SystemTimer(getDM().getSystem(), true);
}
- return this.idleConnTimer;
+ return idleConnTimer;
}
protected void close() {
- /*
- * NOMUX if (inputMuxManager != null) { inputMuxManager.stop(); }
- */
- if (this.closed) {
+ if (closed) {
return;
}
- this.closed = true;
+ closed = true;
synchronized (this) {
- if (this.idleConnTimer != null) {
- this.idleConnTimer.cancel();
+ if (idleConnTimer != null) {
+ idleConnTimer.cancel();
}
}
- synchronized (this.orderedConnectionMap) {
- for (Iterator it = this.orderedConnectionMap.values().iterator(); it.hasNext();) {
- closeCon(
- "Connection table being destroyed",
- it.next());
+ synchronized (orderedConnectionMap) {
+ for (Object o : orderedConnectionMap.values()) {
+ closeCon("Connection table being destroyed", o);
}
- this.orderedConnectionMap.clear();
+ orderedConnectionMap.clear();
}
- synchronized (this.unorderedConnectionMap) {
- for (Iterator it = this.unorderedConnectionMap.values().iterator(); it.hasNext();) {
- closeCon(
- "Connection table being destroyed",
- it.next());
+ synchronized (unorderedConnectionMap) {
+ for (Object o : unorderedConnectionMap.values()) {
+ closeCon("Connection table being destroyed", o);
}
- this.unorderedConnectionMap.clear();
+ unorderedConnectionMap.clear();
}
- if (this.threadConnectionMap != null) {
- this.threadConnectionMap = null;
+ if (threadConnectionMap != null) {
+ threadConnectionMap = null;
}
- if (this.threadConnMaps != null) {
- synchronized (this.threadConnMaps) {
- for (Iterator it = this.threadConnMaps.iterator(); it.hasNext();) {
- Reference r = (Reference) it.next();
- Map m = (Map) r.get();
- if (m != null) {
- synchronized (m) {
- for (Iterator mit = m.values().iterator(); mit.hasNext();) {
- closeCon("Connection table being destroyed", mit.next());
+ if (threadConnMaps != null) {
+ synchronized (threadConnMaps) {
+ for (Object threadConnMap : threadConnMaps) {
+ Reference reference = (Reference) threadConnMap;
+ Map map = (Map) reference.get();
+ if (map != null) {
+ synchronized (map) {
+ for (Object o : map.values()) {
+ closeCon("Connection table being destroyed", o);
}
}
}
}
- this.threadConnMaps.clear();
+ threadConnMaps.clear();
}
}
- {
- Executor localExec = this.p2pReaderThreadPool;
- if (localExec != null) {
- if (localExec instanceof ExecutorService) {
- ((ExecutorService) localExec).shutdown();
- }
+ Executor localExec = p2pReaderThreadPool;
+ if (localExec != null) {
+ if (localExec instanceof ExecutorService) {
+ ((ExecutorService) localExec).shutdown();
}
}
closeReceivers(false);
- Map m = (Map) this.threadOrderedConnMap.get();
- if (m != null) {
- synchronized (m) {
- m.clear();
+ Map map = threadOrderedConnMap.get();
+ if (map != null) {
+ synchronized (map) {
+ map.clear();
}
}
- this.socketCloser.close();
+ socketCloser.close();
}
public void executeCommand(Runnable runnable) {
- Executor local = this.p2pReaderThreadPool;
+ Executor local = p2pReaderThreadPool;
if (local != null) {
local.execute(runnable);
}
@@ -734,14 +697,12 @@ public class ConnectionTable {
*
* @param beingSick a test hook to simulate a sick process
*/
- protected void closeReceivers(boolean beingSick) {
- synchronized (this.receivers) {
- for (Iterator it = this.receivers.iterator(); it.hasNext();) {
+ private void closeReceivers(boolean beingSick) {
+ synchronized (receivers) {
+ for (Iterator it = receivers.iterator(); it.hasNext();) {
Connection con = (Connection) it.next();
if (!beingSick || con.preserveOrder) {
- closeCon(
- "Connection table being destroyed",
- con, beingSick);
+ closeCon("Connection table being destroyed", con, beingSick);
it.remove();
}
}
@@ -749,7 +710,6 @@ public class ConnectionTable {
synchronized (connectingSockets) {
for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
- // ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue();
try {
((Socket) entry.getKey()).close();
} catch (IOException e) {
@@ -761,103 +721,90 @@ public class ConnectionTable {
}
}
-
- protected void removeReceiver(Object con) {
- synchronized (this.receivers) {
- this.receivers.remove(con);
+ void removeReceiver(Object con) {
+ synchronized (receivers) {
+ receivers.remove(con);
}
}
/**
- * Return true if our owner already knows that this endpoint is departing
+ * remove an endpoint and notify the membership manager of the departure
*/
- protected boolean isEndpointShuttingDown(DistributedMember id) {
- return giveUpOnMember(owner.getDM().getDistribution(), id);
- }
-
- protected boolean giveUpOnMember(Distribution mgr, DistributedMember remoteAddr) {
- return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress();
- }
-
- /** remove an endpoint and notify the membership manager of the departure */
- protected void removeEndpoint(DistributedMember stub, String reason) {
+ void removeEndpoint(DistributedMember stub, String reason) {
removeEndpoint(stub, reason, true);
}
- protected void removeEndpoint(DistributedMember memberID, String reason,
- boolean notifyDisconnect) {
- if (this.closed) {
+ void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) {
+ if (closed) {
return;
}
boolean needsRemoval = false;
- synchronized (this.orderedConnectionMap) {
- if (this.orderedConnectionMap.get(memberID) != null)
+ synchronized (orderedConnectionMap) {
+ if (orderedConnectionMap.get(memberID) != null)
needsRemoval = true;
}
if (!needsRemoval) {
- synchronized (this.unorderedConnectionMap) {
- if (this.unorderedConnectionMap.get(memberID) != null)
+ synchronized (unorderedConnectionMap) {
+ if (unorderedConnectionMap.get(memberID) != null)
needsRemoval = true;
}
}
if (!needsRemoval) {
- ConcurrentMap cm = this.threadConnectionMap;
+ ConcurrentMap cm = threadConnectionMap;
if (cm != null) {
- ArrayList al = (ArrayList) cm.get(memberID);
- needsRemoval = al != null && al.size() > 0;
+ List al = (ArrayList) cm.get(memberID);
+ needsRemoval = al != null && !al.isEmpty();
}
}
if (needsRemoval) {
InternalDistributedMember remoteAddress = null;
- synchronized (this.orderedConnectionMap) {
- Object c = this.orderedConnectionMap.remove(memberID);
+ synchronized (orderedConnectionMap) {
+ Object c = orderedConnectionMap.remove(memberID);
if (c instanceof Connection) {
remoteAddress = ((Connection) c).getRemoteAddress();
}
closeCon(reason, c);
}
- synchronized (this.unorderedConnectionMap) {
- Object c = this.unorderedConnectionMap.remove(memberID);
- if (remoteAddress == null && (c instanceof Connection)) {
+ synchronized (unorderedConnectionMap) {
+ Object c = unorderedConnectionMap.remove(memberID);
+ if (remoteAddress == null && c instanceof Connection) {
remoteAddress = ((Connection) c).getRemoteAddress();
}
closeCon(reason, c);
}
- {
- ConcurrentMap cm = this.threadConnectionMap;
- if (cm != null) {
- ArrayList al = (ArrayList) cm.remove(memberID);
- if (al != null) {
- synchronized (al) {
- for (Iterator it = al.iterator(); it.hasNext();) {
- Object c = it.next();
- if (remoteAddress == null && (c instanceof Connection)) {
- remoteAddress = ((Connection) c).getRemoteAddress();
- }
- closeCon(reason, c);
+ ConcurrentMap cm = threadConnectionMap;
+ if (cm != null) {
+ List al = (ArrayList) cm.remove(memberID);
+ if (al != null) {
+ synchronized (al) {
+ for (Object c : al) {
+ if (remoteAddress == null && c instanceof Connection) {
+ remoteAddress = ((Connection) c).getRemoteAddress();
}
- al.clear();
+ closeCon(reason, c);
}
+ al.clear();
}
}
}
// close any sockets that are in the process of being connected
- Set toRemove = new HashSet();
+ Collection toRemove = new HashSet();
synchronized (connectingSockets) {
for (Iterator it = connectingSockets.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
ConnectingSocketInfo info = (ConnectingSocketInfo) entry.getValue();
- if (info.peerAddress.equals(((InternalDistributedMember) memberID).getInetAddress())) {
+ if (info.peerAddress.equals(((MemberIdentifier) memberID).getInetAddress())) {
toRemove.add(entry.getKey());
it.remove();
}
}
}
- for (Iterator it = toRemove.iterator(); it.hasNext();) {
- Socket sock = (Socket) it.next();
+
+ for (Object o : toRemove) {
+ Closeable sock = (Socket) o;
try {
sock.close();
} catch (IOException e) {
@@ -869,10 +816,9 @@ public class ConnectionTable {
}
// close any receivers
- // avoid deadlock when a NIC has failed by closing connections outside
- // of the receivers sync (bug 38731)
+ // avoid deadlock when a NIC has failed by closing connections outside of the receivers sync
toRemove.clear();
- synchronized (this.receivers) {
+ synchronized (receivers) {
for (Iterator it = receivers.iterator(); it.hasNext();) {
Connection con = (Connection) it.next();
if (memberID.equals(con.getRemoteAddress())) {
@@ -881,8 +827,8 @@ public class ConnectionTable {
}
}
}
- for (Iterator it = toRemove.iterator(); it.hasNext();) {
- Connection con = (Connection) it.next();
+ for (Object o : toRemove) {
+ Connection con = (Connection) o;
closeCon(reason, con);
}
if (notifyDisconnect) {
@@ -893,20 +839,20 @@ public class ConnectionTable {
}
if (remoteAddress != null) {
- this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
+ socketCloser.releaseResourcesForAddress(remoteAddress.toString());
}
}
}
SocketCloser getSocketCloser() {
- return this.socketCloser;
+ return socketCloser;
}
/** check to see if there are still any receiver threads for the given end-point */
- protected boolean hasReceiversFor(DistributedMember endPoint) {
- synchronized (this.receivers) {
- for (Iterator it = receivers.iterator(); it.hasNext();) {
- Connection con = (Connection) it.next();
+ boolean hasReceiversFor(DistributedMember endPoint) {
+ synchronized (receivers) {
+ for (Object receiver : receivers) {
+ Connection con = (Connection) receiver;
if (endPoint.equals(con.getRemoteAddress())) {
return true;
}
@@ -918,7 +864,7 @@ public class ConnectionTable {
private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub,
Connection c) {
if (cm != null) {
- ArrayList al = (ArrayList) cm.get(stub);
+ List al = (ArrayList) cm.get(stub);
if (al != null) {
synchronized (al) {
al.remove(c);
@@ -927,37 +873,34 @@ public class ConnectionTable {
}
}
- protected void removeThreadConnection(DistributedMember stub, Connection c) {
- /*
- * if (this.closed) { return; }
- */
- removeFromThreadConMap(this.threadConnectionMap, stub, c);
- Map m = (Map) this.threadOrderedConnMap.get();
+ void removeThreadConnection(DistributedMember stub, Connection c) {
+ removeFromThreadConMap(threadConnectionMap, stub, c);
+ Map m = threadOrderedConnMap.get();
if (m != null) {
// Static cleanup thread might intervene, so we MUST synchronize
synchronized (m) {
if (m.get(stub) == c) {
m.remove(stub);
}
- } // synchronized
- } // m != null
+ }
+ }
}
void removeSharedConnection(String reason, DistributedMember stub, boolean ordered,
Connection c) {
- if (this.closed) {
+ if (closed) {
return;
}
if (ordered) {
- synchronized (this.orderedConnectionMap) {
- if (this.orderedConnectionMap.get(stub) == c) {
- closeCon(reason, this.orderedConnectionMap.remove(stub));
+ synchronized (orderedConnectionMap) {
+ if (orderedConnectionMap.get(stub) == c) {
+ closeCon(reason, orderedConnectionMap.remove(stub));
}
}
} else {
- synchronized (this.unorderedConnectionMap) {
- if (this.unorderedConnectionMap.get(stub) == c) {
- closeCon(reason, this.unorderedConnectionMap.remove(stub));
+ synchronized (unorderedConnectionMap) {
+ if (unorderedConnectionMap.get(stub) == c) {
+ closeCon(reason, unorderedConnectionMap.remove(stub));
}
}
}
@@ -986,8 +929,8 @@ public class ConnectionTable {
lastInstance.set(null);
}
- public void removeAndCloseThreadOwnedSockets() {
- Map m = (Map) this.threadOrderedConnMap.get();
+ void removeAndCloseThreadOwnedSockets() {
+ Map m = threadOrderedConnMap.get();
if (m != null) {
// Static cleanup may intervene; we MUST synchronize.
synchronized (m) {
@@ -996,11 +939,11 @@ public class ConnectionTable {
Map.Entry me = (Map.Entry) it.next();
DistributedMember stub = (DistributedMember) me.getKey();
Connection c = (Connection) me.getValue();
- removeFromThreadConMap(this.threadConnectionMap, stub, c);
+ removeFromThreadConMap(threadConnectionMap, stub, c);
it.remove();
closeCon("thread finalization", c);
- } // while
- } // synchronized m
+ }
+ }
}
}
@@ -1010,7 +953,6 @@ public class ConnectionTable {
return;
}
ct.removeAndCloseThreadOwnedSockets();
- // lastInstance = null;
}
/**
@@ -1019,9 +961,8 @@ public class ConnectionTable {
*
* @since GemFire 5.1
*/
- protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
-
- ConcurrentMap cm = this.threadConnectionMap;
+ void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
+ ConcurrentMap cm = threadConnectionMap;
if (cm != null) {
ArrayList al = (ArrayList) cm.get(member);
if (al != null) {
@@ -1029,10 +970,10 @@ public class ConnectionTable {
al = new ArrayList(al);
}
- for (Iterator it = al.iterator(); it.hasNext();) {
- Connection conn = (Connection) it.next();
+ for (Object o : al) {
+ Connection conn = (Connection) o;
if (!conn.isSharedResource() && conn.getOriginatedHere() && conn.getPreserveOrder()) {
- result.put(Long.valueOf(conn.getUniqueId()), Long.valueOf(conn.getMessagesSent()));
+ result.put(conn.getUniqueId(), conn.getMessagesSent());
}
}
}
@@ -1042,21 +983,23 @@ public class ConnectionTable {
/**
* wait for the given incoming connections to receive at least the associated number of messages
*/
- protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member,
- Map connectionStates) throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException(); // wisest to do this before the synchronize below
- List r = null;
+ void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map connectionStates)
+ throws InterruptedException {
+ if (Thread.interrupted()) {
+ // wisest to do this before the synchronize below
+ throw new InterruptedException();
+ }
+ List r;
synchronized (receivers) {
r = new ArrayList(receivers);
}
- for (Iterator it = r.iterator(); it.hasNext();) {
- Connection con = (Connection) it.next();
+ for (Object o : r) {
+ Connection con = (Connection) o;
if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder()
&& member.equals(con.getRemoteAddress())) {
- Long state = (Long) connectionStates.remove(Long.valueOf(con.getUniqueId()));
+ Long state = (Long) connectionStates.remove(con.getUniqueId());
if (state != null) {
- long count = state.longValue();
+ long count = state;
while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) {
if (logger.isDebugEnabled()) {
logger.debug("Waiting for connection {}/{} currently={} need={}",
@@ -1067,7 +1010,7 @@ public class ConnectionTable {
}
}
}
- if (connectionStates.size() > 0) {
+ if (!connectionStates.isEmpty()) {
if (logger.isDebugEnabled()) {
StringBuffer sb = new StringBuffer(1000);
sb.append("These connections from ");
@@ -1086,41 +1029,29 @@ public class ConnectionTable {
}
protected DistributionManager getDM() {
- return this.owner.getDM();
+ return owner.getDM();
}
- // public boolean isShuttingDown() {
- // return this.owner.isShuttingDown();
- // }
-
- // protected void cleanupHighWater() {
- // cleanup(highWater);
- // }
-
- // protected void cleanupLowWater() {
- // cleanup(lowWater);
- // }
-
- // private void cleanup(int maxConnections) {
- /*
- * if (maxConnections == 0 || maxConnections >= connections.size()) { return; } while
- * (connections.size() > maxConnections) { Connection oldest = null; synchronized(connections) {
- * for (Iterator iter = connections.values().iterator(); iter.hasNext(); ) { Connection c =
- * (Connection)iter.next(); if (oldest == null || c.getTimeStamp() < oldest.getTimeStamp()) {
- * oldest = c; } } } // sanity check - don't close anything fresher than 10 seconds or // we'll
- * start thrashing if (oldest.getTimeStamp() > (System.currentTimeMillis() - 10000)) { if
- * (owner.lowWaterConnectionCount > 0) { owner.lowWaterConnectionCount += 10; } if
- * (owner.highWaterConnectionCount > 0) { owner.highWaterConnectionCount += 10; } new Object[] {
- * owner.lowWaterConnectionCount, owner.highWaterConnectionCount }); break; } if (oldest != null)
- * { oldest.close(); } }
- */
- // }
+ /** keep track of a socket that is trying to connect() for shutdown purposes */
+ void addConnectingSocket(Socket socket, InetAddress addr) {
+ synchronized (connectingSockets) {
+ connectingSockets.put(socket, new ConnectingSocketInfo(addr));
+ }
+ }
+
+ /** remove a socket from the tracked set. It should be connected at this point */
+ void removeConnectingSocket(Socket socket) {
+ synchronized (connectingSockets) {
+ connectingSockets.remove(socket);
+ }
+ }
+
+ int getNumberOfReceivers() {
+ return receivers.size();
+ }
+
+ private class PendingConnection {
- /*
- * public void dumpConnectionTable() { Iterator iter = connectionMap.keySet().iterator(); while
- * (iter.hasNext()) { Object key = iter.next(); Object val = connectionMap.get(key); } }
- */
- private /* static */ class PendingConnection {
/**
* true if this connection is still pending
*/
@@ -1129,7 +1060,7 @@ public class ConnectionTable {
/**
* the connection we are waiting on
*/
- private Connection conn = null;
+ private Connection conn;
/**
* whether the connection preserves message ordering
@@ -1143,10 +1074,10 @@ public class ConnectionTable {
private final Thread connectingThread;
- public PendingConnection(boolean preserveOrder, DistributedMember id) {
+ private PendingConnection(boolean preserveOrder, DistributedMember id) {
this.preserveOrder = preserveOrder;
this.id = id;
- this.connectingThread = Thread.currentThread();
+ connectingThread = Thread.currentThread();
}
/**
@@ -1154,17 +1085,18 @@ public class ConnectionTable {
*
* @param c the new connection
*/
- public synchronized void notifyWaiters(Connection c) {
- if (!this.pending)
- return; // already done.
+ private synchronized void notifyWaiters(Connection c) {
+ if (!pending) {
+ return;
+ }
- this.conn = c;
- this.pending = false;
+ conn = c;
+ pending = false;
if (logger.isDebugEnabled()) {
logger.debug("Notifying waiters that pending {} connection to {} is ready; {}",
- ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
+ preserveOrder ? "ordered" : "unordered", id, this);
}
- this.notifyAll();
+ notifyAll();
}
/**
@@ -1176,24 +1108,23 @@ public class ConnectionTable {
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
* @return the new connection
*/
- public synchronized Connection waitForConnect(Membership mgr, long startTime,
- long ackTimeout, long ackSATimeout) throws IOException {
+ private synchronized Connection waitForConnect(Membership mgr, long startTime, long ackTimeout,
+ long ackSATimeout) {
if (connectingThread == Thread.currentThread()) {
throw new ReenteredConnectException("This thread is already trying to connect");
}
- final Map m = this.preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
+ final Map m = preserveOrder ? orderedConnectionMap : unorderedConnectionMap;
- boolean severeAlertIssued = false;
- boolean suspected = false;
DistributedMember targetMember = null;
if (ackSATimeout > 0) {
- targetMember = this.id;
+ targetMember = id;
}
- int attempt = 0;
- for (;;) {
- if (!this.pending) {
+ boolean suspected = false;
+ boolean severeAlertIssued = false;
+ for (int attempt = 0;;) {
+ if (!pending) {
break;
}
getConduit().getCancelCriterion().checkCancelInProgress(null);
@@ -1201,7 +1132,7 @@ public class ConnectionTable {
// wait a little bit...
boolean interrupted = Thread.interrupted();
try {
- this.wait(100); // spurious wakeup ok
+ wait(100);
} catch (InterruptedException ignore) {
interrupted = true;
getConduit().getCancelCriterion().checkCancelInProgress(ignore);
@@ -1211,8 +1142,9 @@ public class ConnectionTable {
}
}
- if (!this.pending)
+ if (!pending) {
break;
+ }
// Still pending...
long now = System.currentTimeMillis();
@@ -1225,22 +1157,19 @@ public class ConnectionTable {
severeAlertIssued = true;
} else if (!suspected) {
logger.warn("Unable to form a TCP/IP connection to %s in over %s seconds",
- this.id, (ackTimeout) / 1000);
- ((GMSMembership) mgr).suspectMember((InternalDistributedMember) targetMember,
+ id, ackTimeout / 1000);
+ mgr.suspectMember(targetMember,
"Unable to form a TCP/IP connection in a reasonable amount of time");
suspected = true;
}
}
- Object e;
- // synchronized (m) {
- e = m.get(this.id);
- // }
+ Object e = m.get(id);
if (e == this) {
attempt += 1;
- if (logger.isDebugEnabled() && (attempt % 20 == 1)) {
+ if (logger.isDebugEnabled() && attempt % 20 == 1) {
logger.debug("Waiting for pending connection to complete: {} connection to {}; {}",
- ((this.preserveOrder) ? "ordered" : "unordered"), this.id, this);
+ preserveOrder ? "ordered" : "unordered", id, this);
}
continue;
}
@@ -1254,17 +1183,16 @@ public class ConnectionTable {
// We were removed
notifyWaiters(null);
break;
- } else if (e instanceof Connection) {
+ }
+ if (e instanceof Connection) {
notifyWaiters((Connection) e);
break;
- } else {
- // defer to the new instance
- return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout);
}
+ // defer to the new instance
+ return ((PendingConnection) e).waitForConnect(mgr, startTime, ackTimeout, ackSATimeout);
- } // for
- return this.conn;
-
+ }
+ return conn;
}
public String toString() {
@@ -1272,28 +1200,27 @@ public class ConnectionTable {
}
}
-
private static class IdleConnTT extends SystemTimer.SystemTimerTask {
private Connection c;
- IdleConnTT(Connection c) {
+ private IdleConnTT(Connection c) {
this.c = c;
}
@Override
public boolean cancel() {
- Connection con = this.c;
+ Connection con = c;
if (con != null) {
con.cleanUpOnIdleTaskCancel();
}
- this.c = null;
+ c = null;
return super.cancel();
}
@Override
public void run2() {
- Connection con = this.c;
+ Connection con = c;
if (con != null) {
if (con.checkForIdleTimeout()) {
cancel();
@@ -1302,38 +1229,12 @@ public class ConnectionTable {
}
}
- public static ConnectionTable create(TCPConduit conduit) throws IOException {
- ConnectionTable ct = new ConnectionTable(conduit);
- lastInstance.set(ct);
- return ct;
- }
-
- /** keep track of a socket that is trying to connect() for shutdown purposes */
- public void addConnectingSocket(Socket socket, InetAddress addr) {
- synchronized (connectingSockets) {
- connectingSockets.put(socket, new ConnectingSocketInfo(addr));
- }
- }
-
- /** remove a socket from the tracked set. It should be connected at this point */
- public void removeConnectingSocket(Socket socket) {
- synchronized (connectingSockets) {
- connectingSockets.remove(socket);
- }
- }
-
-
private static class ConnectingSocketInfo {
- InetAddress peerAddress;
- Thread connectingThread;
- public ConnectingSocketInfo(InetAddress addr) {
- this.peerAddress = addr;
- this.connectingThread = Thread.currentThread();
- }
- }
+ private final InetAddress peerAddress;
- public int getNumberOfReceivers() {
- return receivers.size();
+ private ConnectingSocketInfo(InetAddress addr) {
+ peerAddress = addr;
+ }
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 365bf35..de36b1a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -14,12 +14,17 @@
*/
package org.apache.geode.internal.tcp;
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE;
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.net.UnknownHostException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
@@ -56,26 +61,21 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
* <p>
* TCPConduit manages a server socket and a collection of connections to other systems. Connections
* are identified by DistributedMember IDs. These types of messages are currently supported:
- * </p>
*
- * <pre>
- * <p>
- * DistributionMessage - message is delivered to the server's
- * ServerDelegate
* <p>
- * </pre>
+ * DistributionMessage - message is delivered to the server's ServerDelegate
+ *
* <p>
* In the current implementation, ServerDelegate is the DirectChannel used by the GemFire
* DistributionManager to send and receive messages.
+ *
* <p>
* If the ServerDelegate is null, DistributionMessages are ignored by the TCPConduit.
- * </p>
*
* @since GemFire 2.0
*/
public class TCPConduit implements Runnable {
-
private static final Logger logger = LogService.getLogger();
/**
@@ -93,6 +93,7 @@ public class TCPConduit implements Runnable {
* enough. Setting it too low can also have adverse effects because backlog overflows
* aren't handled well by most tcp/ip implementations, causing connect timeouts instead
* of expected ServerRefusedConnection exceptions.
+ *
* <p>
* Normally the backlog isn't that important because if it's full of connection requests
* a SYN "cookie" mechanism is used to bypass the backlog queue. If this is turned off
@@ -104,45 +105,24 @@ public class TCPConduit implements Runnable {
/**
* use javax.net.ssl.SSLServerSocketFactory?
*/
- boolean useSSL;
+ private final boolean useSSL;
/**
* The socket producer used by the cluster
*/
private final SocketCreator socketCreator;
-
- private Membership membership;
+ private final Membership membership;
static {
init();
}
- public Membership getMembership() {
- return membership;
- }
-
- public static int getBackLog() {
- return BACKLOG;
- }
-
- public static void init() {
- // only use direct buffers if we are using nio
- LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
- // note: bug 37730 concerned this defaulting to 50
- BACKLOG = Integer.getInteger("p2p.backlog", 1280);
- if (Boolean.getBoolean("p2p.oldIO")) {
- logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
- }
- }
-
- ///////////////// permanent conduit state
-
/**
* the size of OS TCP/IP buffers, not set by default
*/
- int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
- int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
+ int tcpBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
+ int idleConnectionTimeout = DEFAULT_SOCKET_LEASE_TIME;
/**
* port is the tcp/ip port that this conduit binds to. If it is zero, a port from
@@ -151,8 +131,8 @@ public class TCPConduit implements Runnable {
*/
private int port;
- private int[] tcpPortRange = new int[] {DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[0],
- DistributionConfig.DEFAULT_MEMBERSHIP_PORT_RANGE[1]};
+ private final int[] tcpPortRange =
+ new int[] {DEFAULT_MEMBERSHIP_PORT_RANGE[0], DEFAULT_MEMBERSHIP_PORT_RANGE[1]};
/**
* The java groups address that this conduit is associated with
@@ -183,14 +163,12 @@ public class TCPConduit implements Runnable {
*/
private DistributionConfig config;
- ////////////////// runtime state that is re-initialized on a restart
-
/**
* server socket address
*/
private InetSocketAddress id;
- protected volatile boolean stopped;
+ private volatile boolean stopped;
/**
* the listener thread
@@ -213,14 +191,20 @@ public class TCPConduit implements Runnable {
private ConnectionTable conTable;
/**
+ * the reason for a shutdown, if abnormal
+ */
+ private volatile Exception shutdownCause;
+
+ private final Stopper stopper = new Stopper();
+
+ /**
* <p>
* creates a new TCPConduit bound to the given InetAddress and port. The given ServerDelegate will
* receive any DistributionMessages passed to the conduit.
- * </p>
+ *
* <p>
* This constructor forces the conduit to ignore the following system properties and look for them
* only in the <i>props</i> argument:
- * </p>
*
* <pre>
* p2p.tcpBufferSize
@@ -234,35 +218,28 @@ public class TCPConduit implements Runnable {
this.address = address;
this.isBindAddress = isBindAddress;
this.port = port;
- this.directChannel = receiver;
- this.stats = null;
- this.config = null;
- this.membership = mgr;
+ directChannel = receiver;
+ stats = null;
+ config = null;
+ membership = mgr;
if (directChannel != null) {
- this.stats = directChannel.getDMStats();
- this.config = directChannel.getDMConfig();
+ stats = directChannel.getDMStats();
+ config = directChannel.getDMConfig();
}
- if (this.getStats() == null) {
- this.stats = new LonerDistributionManager.DummyDMStats();
+ if (getStats() == null) {
+ stats = new LonerDistributionManager.DummyDMStats();
}
- try {
- this.conTable = ConnectionTable.create(this);
- } catch (IOException io) {
- throw new ConnectionException(
- "Unable to initialize connection table",
- io);
- }
+ conTable = ConnectionTable.create(this);
- this.socketCreator =
+ socketCreator =
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER);
- this.useSSL = socketCreator.useSSL();
+ useSSL = socketCreator.useSSL();
- InetAddress addr = address;
- if (addr == null) {
+ if (address == null) {
try {
- addr = SocketCreator.getLocalHost();
- } catch (java.net.UnknownHostException e) {
+ SocketCreator.getLocalHost();
+ } catch (UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
}
@@ -270,25 +247,39 @@ public class TCPConduit implements Runnable {
startAcceptor();
}
+ public static void init() {
+ // only use direct buffers if we are using nio
+ LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000);
+ BACKLOG = Integer.getInteger("p2p.backlog", 1280);
+ if (Boolean.getBoolean("p2p.oldIO")) {
+ logger.warn("detected use of p2p.oldIO setting - this is no longer supported");
+ }
+ }
+
+ public Membership getMembership() {
+ return membership;
+ }
+
+ public static int getBackLog() {
+ return BACKLOG;
+ }
/**
* parse instance-level properties from the given object
*/
private void parseProperties(Properties p) {
if (p != null) {
- String s;
- s = p.getProperty("p2p.tcpBufferSize", "" + tcpBufferSize);
+ String s = p.getProperty("p2p.tcpBufferSize", String.valueOf(tcpBufferSize));
try {
tcpBufferSize = Integer.parseInt(s);
} catch (Exception e) {
- logger.warn("exception parsing p2p.tcpBufferSize",
- e);
+ logger.warn("exception parsing p2p.tcpBufferSize", e);
}
if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
// enforce minimum
tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
}
- s = p.getProperty("p2p.idleConnectionTimeout", "" + idleConnectionTimeout);
+ s = p.getProperty("p2p.idleConnectionTimeout", String.valueOf(idleConnectionTimeout));
try {
idleConnectionTimeout = Integer.parseInt(s);
} catch (Exception e) {
@@ -306,27 +297,20 @@ public class TCPConduit implements Runnable {
try {
tcpPortRange[1] = Integer.parseInt(s);
} catch (Exception e) {
- logger.warn("Exception parsing membership-port-range end port.",
- e);
+ logger.warn("Exception parsing membership-port-range end port.", e);
}
-
}
}
/**
- * the reason for a shutdown, if abnormal
- */
- private volatile Exception shutdownCause;
-
- /**
* binds the server socket and gets threads going
*/
private void startAcceptor() throws ConnectionException {
- int localPort;
- int p = this.port;
+ int p = port;
createServerSocket();
+ int localPort;
try {
localPort = socket.getLocalPort();
@@ -349,7 +333,7 @@ public class TCPConduit implements Runnable {
String s = "While creating ServerSocket on port " + p;
throw new ConnectionException(s, io);
}
- this.port = localPort;
+ port = localPort;
}
/**
@@ -357,16 +341,15 @@ public class TCPConduit implements Runnable {
* this.bindAddress, which must be set before invoking this method.
*/
private void createServerSocket() {
- int serverPort = this.port;
+ int serverPort = port;
int connectionRequestBacklog = BACKLOG;
- InetAddress bindAddress = this.address;
+ InetAddress bindAddress = address;
try {
if (serverPort <= 0) {
-
socket = socketCreator.createServerSocketUsingPortRange(bindAddress,
- connectionRequestBacklog, isBindAddress,
- true, 0, tcpPortRange);
+ connectionRequestBacklog, isBindAddress, true, 0, tcpPortRange);
+
} else {
ServerSocketChannel channel = ServerSocketChannel.open();
socket = channel.socket();
@@ -383,8 +366,7 @@ public class TCPConduit implements Runnable {
int newSize = socket.getReceiveBufferSize();
if (newSize != tcpBufferSize) {
logger.info("{} is {} instead of the requested {}",
- "Listener receiverBufferSize", newSize,
- tcpBufferSize);
+ "Listener receiverBufferSize", newSize, tcpBufferSize);
}
} catch (SocketException ex) {
logger.warn("Failed to set listener receiverBufferSize to {}",
@@ -444,7 +426,9 @@ public class TCPConduit implements Runnable {
conTable = null;
}
- /* stops the conduit, closing all tcp/ip connections */
+ /**
+ * stops the conduit, closing all tcp/ip connections
+ */
public void stop(Exception cause) {
if (!stopped) {
stopped = true;
@@ -454,16 +438,15 @@ public class TCPConduit implements Runnable {
logger.trace(LogMarker.DM_VERBOSE, "Shutting down conduit");
}
try {
- // set timeout endpoint here since interrupt() has been known
- // to hang
+ // set timeout endpoint here since interrupt() has been known to hang
long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
- Thread t = this.thread;
+ Thread t = thread;
if (channel != null) {
channel.close();
- // NOTE: do not try to interrupt the listener thread at this point.
- // Doing so interferes with the channel's socket logic.
+ // NOTE: do not try to interrupt the listener thread at this point;
+ // doing so interferes with the channel's socket logic.
} else {
- ServerSocket s = this.socket;
+ ServerSocket s = socket;
if (s != null) {
s.close();
}
@@ -473,7 +456,7 @@ public class TCPConduit implements Runnable {
}
do {
- t = this.thread;
+ t = thread;
if (t == null || !t.isAlive()) {
break;
}
@@ -490,7 +473,7 @@ public class TCPConduit implements Runnable {
}
// close connections after shutting down acceptor to fix bug 30695
- this.conTable.close();
+ conTable.close();
socket = null;
thread = null;
@@ -499,15 +482,6 @@ public class TCPConduit implements Runnable {
}
/**
- * Returns whether or not this conduit is stopped
- *
- * @since GemFire 3.0
- */
- public boolean isStopped() {
- return this.stopped;
- }
-
- /**
* starts the conduit again after it's been stopped. This will clear the server map if the
* conduit's port is zero (wildcard bind)
*/
@@ -515,20 +489,14 @@ public class TCPConduit implements Runnable {
if (!stopped) {
return;
}
- this.stats = null;
+ stats = null;
if (directChannel != null) {
- this.stats = directChannel.getDMStats();
+ stats = directChannel.getDMStats();
}
- if (this.getStats() == null) {
- this.stats = new LonerDistributionManager.DummyDMStats();
- }
- try {
- this.conTable = ConnectionTable.create(this);
- } catch (IOException io) {
- throw new ConnectionException(
- "Unable to initialize connection table",
- io);
+ if (getStats() == null) {
+ stats = new LonerDistributionManager.DummyDMStats();
}
+ conTable = ConnectionTable.create(this);
startAcceptor();
}
@@ -552,9 +520,6 @@ public class TCPConduit implements Runnable {
if (Thread.currentThread().isInterrupted()) {
break;
}
- if (stopper.isCancelInProgress()) {
- break; // part of bug 37271
- }
Socket othersock = null;
try {
@@ -566,6 +531,7 @@ public class TCPConduit implements Runnable {
othersock.close();
}
} catch (Exception e) {
+ // ignored
}
continue;
}
@@ -577,7 +543,7 @@ public class TCPConduit implements Runnable {
} catch (ClosedChannelException | CancelException e) {
break;
} catch (IOException e) {
- this.getStats().incFailedAccept();
+ getStats().incFailedAccept();
try {
if (othersock != null) {
@@ -589,7 +555,7 @@ public class TCPConduit implements Runnable {
if (!stopped) {
if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
- // safe to ignore; see bug 31156
+ // safe to ignore
if (!socket.isClosed()) {
logger.warn("ServerSocket threw 'socket closed' exception but says it is not closed",
e);
@@ -597,8 +563,7 @@ public class TCPConduit implements Runnable {
socket.close();
createServerSocket();
} catch (IOException ioe) {
- logger.fatal("Unable to close and recreate server socket",
- ioe);
+ logger.fatal("Unable to close and recreate server socket", ioe);
// post 5.1.0x, this should force shutdown
try {
Thread.sleep(5000);
@@ -621,8 +586,8 @@ public class TCPConduit implements Runnable {
}
if (!stopped && socket.isClosed()) {
- // NOTE: do not check for distributed system closing here. Messaging
- // may need to occur during the closing of the DS or cache
+ // NOTE: do not check for distributed system closing here.
+ // Messaging may need to occur during the closing of the DS or cache
logger.warn("ServerSocket closed - reopening");
try {
createServerSocket();
@@ -630,7 +595,7 @@ public class TCPConduit implements Runnable {
logger.warn(ex.getMessage(), ex);
}
}
- } // for
+ }
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace("Stopped P2P Listener on {}", id);
@@ -638,29 +603,29 @@ public class TCPConduit implements Runnable {
}
private ConnectionTable getConTable() {
- ConnectionTable result = this.conTable;
+ ConnectionTable result = conTable;
if (result == null) {
stopper.checkCancelInProgress(null);
- throw new DistributedSystemDisconnectedException(
- "tcp layer has been shutdown");
+ throw new DistributedSystemDisconnectedException("tcp layer has been shutdown");
}
return result;
}
- private void acceptConnection(Socket othersock) {
+ private void acceptConnection(Socket otherSocket) {
try {
- getConTable().acceptConnection(othersock, new PeerConnectionFactory());
+ getConTable().acceptConnection(otherSocket, new PeerConnectionFactory());
} catch (IOException | ConnectionException io) {
// exception is logged by the Connection
if (!stopped) {
- this.getStats().incFailedAccept();
+ getStats().incFailedAccept();
}
} catch (CancelException e) {
+ // ignored
} catch (Exception e) {
if (!stopped) {
- this.getStats().incFailedAccept();
+ getStats().incFailedAccept();
logger.warn("Failed to accept connection from {} because {}",
- othersock.getInetAddress(), e);
+ otherSocket.getInetAddress(), e);
}
}
}
@@ -690,17 +655,16 @@ public class TCPConduit implements Runnable {
*
* @param bytesRead number of bytes read off of network to get this message
*/
- protected void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
+ void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
if (logger.isTraceEnabled()) {
logger.trace("{} received {} from {}", id, message, receiver);
}
if (directChannel != null) {
- DistributionMessage msg = message;
- msg.setBytesRead(bytesRead);
- msg.setSender(receiver.getRemoteAddress());
- msg.setSharedReceiver(receiver.isSharedResource());
- directChannel.receive(msg, bytesRead);
+ message.setBytesRead(bytesRead);
+ message.setSender(receiver.getRemoteAddress());
+ message.setSharedReceiver(receiver.isSharedResource());
+ directChannel.receive(message, bytesRead);
}
}
@@ -722,7 +686,7 @@ public class TCPConduit implements Runnable {
* Gets the local member ID that identifies this conduit
*/
public InternalDistributedMember getMemberId() {
- return this.localAddr;
+ return localAddr;
}
public void setMemberId(InternalDistributedMember addr) {
@@ -733,8 +697,6 @@ public class TCPConduit implements Runnable {
return config;
}
-
-
/**
* Return a connection to the given member. This method must continue to attempt to create a
* connection to the given member as long as that member is in the membership view and the system
@@ -752,30 +714,26 @@ public class TCPConduit implements Runnable {
*/
public Connection getConnection(InternalDistributedMember memberAddress,
final boolean preserveOrder, boolean retry, long startTime, long ackTimeout,
- long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException {
+ long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
if (stopped) {
- throw new DistributedSystemDisconnectedException(
- "The conduit is stopped");
+ throw new DistributedSystemDisconnectedException("The conduit is stopped");
}
- Connection conn = null;
InternalDistributedMember memberInTrouble = null;
- boolean breakLoop = false;
- for (;;) {
+ Connection conn = null;
+ for (boolean breakLoop = false;;) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
- // If this is the second time through this loop, we had
- // problems. Tear down the connection so that it gets
- // rebuilt.
+ // If this is the second time through this loop, we had problems.
+ // Tear down the connection so that it gets rebuilt.
if (retry || conn != null) { // not first time in loop
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)
|| membership.shutdownInProgress()) {
- throw new IOException(
- "TCP/IP connection lost and member is not in view");
+ throw new IOException("TCP/IP connection lost and member is not in view");
}
- // bug35953: Member is still in view; we MUST NOT give up!
+ // Member is still in view; we MUST NOT give up!
// Pause just a tiny bit...
try {
@@ -789,8 +747,7 @@ public class TCPConduit implements Runnable {
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)) {
// OK, the member left. Just register an error.
- throw new IOException(
- "TCP/IP connection lost and member is not in view");
+ throw new IOException("TCP/IP connection lost and member is not in view");
}
// Print a warning (once)
@@ -804,7 +761,7 @@ public class TCPConduit implements Runnable {
}
// Close the connection (it will get rebuilt later).
- this.getStats().incReconnectAttempts();
+ getStats().incReconnectAttempts();
if (conn != null) {
try {
if (logger.isDebugEnabled()) {
@@ -815,6 +772,7 @@ public class TCPConduit implements Runnable {
} catch (CancelException ex) {
throw ex;
} catch (Exception ex) {
+ // ignored
}
}
} // not first time in loop
@@ -822,8 +780,7 @@ public class TCPConduit implements Runnable {
Exception problem = null;
try {
// Get (or regenerate) the connection
- // bug36202: this could generate a ConnectionException, so it
- // must be caught and retried
+ // this could generate a ConnectionException, so it must be caught and retried
boolean retryForOldConnection;
boolean debugRetry = false;
do {
@@ -855,13 +812,13 @@ public class TCPConduit implements Runnable {
// Race condition between acquiring the connection and attempting
// to use it: another thread closed it.
problem = e;
- // [sumedh] No need to retry since Connection.createSender has already
+ // No need to retry since Connection.createSender has already
// done retries and now member is really unreachable for some reason
// even though it may be in the view
breakLoop = true;
} catch (IOException e) {
problem = e;
- // bug #43962 don't keep trying to connect to an alert listener
+ // don't keep trying to connect to an alert listener
if (AlertingAction.isThreadAlerting()) {
if (logger.isDebugEnabled()) {
logger.debug("Giving up connecting to alert listener {}", memberAddress);
@@ -877,11 +834,10 @@ public class TCPConduit implements Runnable {
// Bracket our original warning
if (memberInTrouble != null) {
// make this msg info to bracket warning
- logger.info("Ending reconnect attempt because {} has disappeared.",
- memberInTrouble);
+ logger.info("Ending reconnect attempt because {} has disappeared.", memberInTrouble);
}
- throw new IOException(String.format("Peer has disappeared from view: %s",
- memberAddress));
+ throw new IOException(
+ String.format("Peer has disappeared from view: %s", memberAddress));
} // left the view
if (membership.shutdownInProgress()) { // shutdown in progress
@@ -899,8 +855,7 @@ public class TCPConduit implements Runnable {
// Log the warning. We wait until now, because we want
// to have m defined for a nice message...
if (memberInTrouble == null) {
- logger.warn("Error sending message to {} (will reattempt): {}",
- memberAddress, problem);
+ logger.warn("Error sending message to {} (will reattempt): {}", memberAddress, problem);
memberInTrouble = memberAddress;
} else {
if (logger.isDebugEnabled()) {
@@ -910,21 +865,17 @@ public class TCPConduit implements Runnable {
if (breakLoop) {
if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
- logger.warn("Throwing IOException after finding breakLoop=true",
- problem);
+ logger.warn("Throwing IOException after finding breakLoop=true", problem);
}
if (problem instanceof IOException) {
throw (IOException) problem;
- } else {
- IOException ioe = new IOException(String.format("Problem connecting to %s",
- memberAddress));
- ioe.initCause(problem);
- throw ioe;
}
+ throw new IOException(
+ String.format("Problem connecting to %s", memberAddress), problem);
}
// Retry the operation (indefinitely)
continue;
- } // problem != null
+ }
// Success!
// Make sure our logging is bracketed if there was a problem
@@ -940,12 +891,12 @@ public class TCPConduit implements Runnable {
Thread.currentThread().interrupt();
}
}
- } // for(;;)
+ }
}
@Override
public String toString() {
- return "" + id;
+ return String.valueOf(id);
}
/**
@@ -956,7 +907,7 @@ public class TCPConduit implements Runnable {
}
public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
- ConnectionTable ct = this.conTable;
+ ConnectionTable ct = conTable;
if (ct == null) {
return;
}
@@ -967,8 +918,8 @@ public class TCPConduit implements Runnable {
* check to see if there are still any receiver threads for the given end-point
*/
public boolean hasReceiversFor(DistributedMember endPoint) {
- ConnectionTable ct = this.conTable;
- return (ct != null) && ct.hasReceiversFor(endPoint);
+ ConnectionTable ct = conTable;
+ return ct != null && ct.hasReceiversFor(endPoint);
}
/**
@@ -983,10 +934,38 @@ public class TCPConduit implements Runnable {
}
public BufferPool getBufferPool() {
- return this.conTable.getBufferPool();
+ return conTable.getBufferPool();
+ }
+
+ public CancelCriterion getCancelCriterion() {
+ return stopper;
+ }
+
+ /**
+ * if the conduit is disconnected due to an abnormal condition, this will describe the reason
+ *
+ * @return exception that caused disconnect
+ */
+ Exception getShutdownCause() {
+ return shutdownCause;
+ }
+
+ /**
+ * returns the SocketCreator that should be used to produce sockets for TCPConduit connections.
+ */
+ protected SocketCreator getSocketCreator() {
+ return socketCreator;
}
- protected class Stopper extends CancelCriterion {
+ /**
+ * Called by Connection before handshake reply is sent. Returns true if member is part of
+ * view, false if membership is not confirmed before timeout.
+ */
+ boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
+ return membership.waitForNewMember(remoteId);
+ }
+
+ private class Stopper extends CancelCriterion {
@Override
public String cancelInProgress() {
@@ -994,7 +973,7 @@ public class TCPConduit implements Runnable {
if (dm == null) {
return "no distribution manager";
}
- if (TCPConduit.this.stopped) {
+ if (stopped) {
return "Conduit has been stopped";
}
return null;
@@ -1015,40 +994,8 @@ public class TCPConduit implements Runnable {
return result;
}
// We know we've been stopped; generate the exception
- result = new DistributedSystemDisconnectedException("Conduit has been stopped");
- result.initCause(e);
+ result = new DistributedSystemDisconnectedException("Conduit has been stopped", e);
return result;
}
}
-
- private final Stopper stopper = new Stopper();
-
- public CancelCriterion getCancelCriterion() {
- return stopper;
- }
-
-
- /**
- * if the conduit is disconnected due to an abnormal condition, this will describe the reason
- *
- * @return exception that caused disconnect
- */
- public Exception getShutdownCause() {
- return this.shutdownCause;
- }
-
- /**
- * returns the SocketCreator that should be used to produce sockets for TCPConduit connections.
- */
- protected SocketCreator getSocketCreator() {
- return socketCreator;
- }
-
- /**
- * ARB: Called by Connection before handshake reply is sent. Returns true if member is part of
- * view, false if membership is not confirmed before timeout.
- */
- public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
- return membership.waitForNewMember(remoteId);
- }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
deleted file mode 100755
index d303090..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.tcp;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.nio.channels.SocketChannel;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.alerting.internal.spi.AlertingAction;
-import org.apache.geode.distributed.internal.DMStats;
-import org.apache.geode.distributed.internal.Distribution;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.internal.net.SocketCreator;
-import org.apache.geode.test.junit.categories.MembershipTest;
-
-@Category({MembershipTest.class})
-public class ConnectionJUnitTest {
-
- /**
- * Test whether suspicion is raised about a member that closes its shared/unordered TCPConduit
- * connection
- */
- @Test
- public void testSuspicionRaised() throws Exception {
- // this test has to create a lot of mocks because Connection
- // uses a lot of objects
-
- // mock the socket
- ConnectionTable table = mock(ConnectionTable.class);
- DistributionManager distMgr = mock(DistributionManager.class);
- Distribution membership = mock(Distribution.class);
- TCPConduit conduit = mock(TCPConduit.class);
- DMStats stats = mock(DMStats.class);
-
- // mock the connection table and conduit
-
- when(table.getConduit()).thenReturn(conduit);
- when(table.getBufferPool()).thenReturn(new BufferPool(stats));
-
- CancelCriterion stopper = mock(CancelCriterion.class);
- when(stopper.cancelInProgress()).thenReturn(null);
- when(conduit.getCancelCriterion()).thenReturn(stopper);
-
- when(conduit.getSocketId())
- .thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 10337));
-
- // mock the distribution manager and membership manager
- when(distMgr.getDistribution()).thenReturn(membership);
- when(conduit.getDM()).thenReturn(distMgr);
- when(conduit.getStats()).thenReturn(stats);
- when(table.getDM()).thenReturn(distMgr);
- SocketCloser closer = mock(SocketCloser.class);
- when(table.getSocketCloser()).thenReturn(closer);
-
- SocketChannel channel = SocketChannel.open();
-
- Connection conn = new Connection(table, channel.socket());
- conn.setSharedUnorderedForTest();
- conn.run();
- verify(membership).suspectMember(isNull(InternalDistributedMember.class), any(String.class));
- }
-
- @Test
- public void connectTimeoutIsShortWhenAlerting() throws UnknownHostException {
- ConnectionTable table = mock(ConnectionTable.class);
- TCPConduit conduit = mock(TCPConduit.class);
- when(table.getConduit()).thenReturn(conduit);
- when(conduit.getSocketId())
- .thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 12345));
- DistributionConfig config = mock(DistributionConfig.class);
- when(config.getMemberTimeout()).thenReturn(100);
- Connection connection = new Connection(table, mock(Socket.class));
- int normalTimeout = connection.getP2PConnectTimeout(config);
- assertThat(normalTimeout).isEqualTo(600);
- AlertingAction.execute(() -> {
- assertThat(connection.getP2PConnectTimeout(config)).isEqualTo(100);
- });
-
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
index ff46493..06112f7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTableTest.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.tcp;
import static org.junit.Assert.assertEquals;
@@ -34,9 +33,9 @@ import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.test.junit.categories.MembershipTest;
-
-@Category({MembershipTest.class})
+@Category(MembershipTest.class)
public class ConnectionTableTest {
+
private ConnectionTable connectionTable;
private Socket socket;
private PeerConnectionFactory factory;
@@ -71,8 +70,8 @@ public class ConnectionTableTest {
@Test
public void testConnectionsClosedDuringCreateAreNotAddedAsReceivers() throws Exception {
when(connection.isReceiverStopped()).thenReturn(false);
- when(connection.isSocketClosed()).thenReturn(true); // Pretend this closed as soon at it was
- // created
+ // Pretend this closed as soon at it was created
+ when(connection.isSocketClosed()).thenReturn(true);
connectionTable.acceptConnection(socket, factory);
assertEquals(0, connectionTable.getNumberOfReceivers());
@@ -80,9 +79,11 @@ public class ConnectionTableTest {
@Test
public void testThreadStoppedNotAddedAsReceivers() throws Exception {
- when(connection.isSocketClosed()).thenReturn(false); // connection is not closed
+ // connection is not closed
+ when(connection.isSocketClosed()).thenReturn(false);
- when(connection.isReceiverStopped()).thenReturn(true);// but receiver is stopped
+ // but receiver is stopped
+ when(connection.isReceiverStopped()).thenReturn(true);
connectionTable.acceptConnection(socket, factory);
assertEquals(0, connectionTable.getNumberOfReceivers());
@@ -90,19 +91,20 @@ public class ConnectionTableTest {
@Test
public void testSocketNotClosedAddedAsReceivers() throws Exception {
- when(connection.isSocketClosed()).thenReturn(false);// connection is not closed
+ // connection is not closed
+ when(connection.isSocketClosed()).thenReturn(false);
connectionTable.acceptConnection(socket, factory);
assertEquals(1, connectionTable.getNumberOfReceivers());
}
@Test
- public void testThreadOwnedSocketsAreRemoved() throws Exception {
+ public void testThreadOwnedSocketsAreRemoved() {
Boolean wantsResources = ConnectionTable.getThreadOwnsResourcesRegistration();
ConnectionTable.threadWantsOwnResources();
try {
Map<DistributedMember, Connection> threadConnectionMap = new HashMap<>();
- connectionTable.threadOrderedConnMap.set(threadConnectionMap);
+ ConnectionTable.threadOrderedConnMap.set(threadConnectionMap);
ConnectionTable.releaseThreadsSockets();
assertEquals(0, threadConnectionMap.size());
} finally {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
index 77160c8..a5dfec8 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java
@@ -14,24 +14,40 @@
*/
package org.apache.geode.internal.tcp;
+import static org.apache.geode.internal.net.SocketCreator.getLocalHost;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.alerting.internal.spi.AlertingAction;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
import org.apache.geode.test.junit.categories.MembershipTest;
-@Category({MembershipTest.class})
+@Category(MembershipTest.class)
public class ConnectionTest {
@Test
- public void shouldBeMockable() throws Exception {
+ public void canBeMocked() throws Exception {
Connection mockConnection = mock(Connection.class);
SocketChannel channel = null;
ByteBuffer buffer = null;
@@ -43,4 +59,58 @@ public class ConnectionTest {
verify(mockConnection, times(1)).writeFully(channel, buffer, forceAsync,
mockDistributionMessage);
}
+
+ /**
+ * Test whether suspicion is raised about a member that closes its shared/unordered TCPConduit
+ * connection
+ */
+ @Test
+ public void testSuspicionRaised() throws Exception {
+ ConnectionTable connectionTable = mock(ConnectionTable.class);
+ Distribution distribution = mock(Distribution.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ DMStats dmStats = mock(DMStats.class);
+ CancelCriterion stopper = mock(CancelCriterion.class);
+ SocketCloser socketCloser = mock(SocketCloser.class);
+ TCPConduit tcpConduit = mock(TCPConduit.class);
+
+ when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+ when(connectionTable.getConduit()).thenReturn(tcpConduit);
+ when(connectionTable.getDM()).thenReturn(distributionManager);
+ when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+ when(distributionManager.getDistribution()).thenReturn(distribution);
+ when(stopper.cancelInProgress()).thenReturn(null);
+ when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+ when(tcpConduit.getDM()).thenReturn(distributionManager);
+ when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 10337));
+ when(tcpConduit.getStats()).thenReturn(dmStats);
+
+ SocketChannel channel = SocketChannel.open();
+
+ Connection connection = new Connection(connectionTable, channel.socket());
+ connection.setSharedUnorderedForTest();
+ connection.run();
+
+ verify(distribution).suspectMember(isNull(), anyString());
+ }
+
+ @Test
+ public void connectTimeoutIsShortWhenAlerting() throws UnknownHostException {
+ ConnectionTable connectionTable = mock(ConnectionTable.class);
+ DistributionConfig distributionConfig = mock(DistributionConfig.class);
+ TCPConduit tcpConduit = mock(TCPConduit.class);
+
+ when(connectionTable.getConduit()).thenReturn(tcpConduit);
+ when(distributionConfig.getMemberTimeout()).thenReturn(100);
+ when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));
+
+ Connection connection = new Connection(connectionTable, mock(Socket.class));
+
+ int normalTimeout = connection.getP2PConnectTimeout(distributionConfig);
+ assertThat(normalTimeout).isEqualTo(600);
+
+ AlertingAction.execute(() -> {
+ assertThat(connection.getP2PConnectTimeout(distributionConfig)).isEqualTo(100);
+ });
+ }
}