You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2017/04/05 22:31:43 UTC
[1/2] geode git commit: GEODE-2684 Connection & ConnectionTable
cleanup
Repository: geode
Updated Branches:
refs/heads/develop 39c72b204 -> 6b2b7b2f7
GEODE-2684 Connection & ConnectionTable cleanup
removed dead code and indirect access of TcpConduit through the
connection table.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6b2b7b2f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6b2b7b2f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6b2b7b2f
Branch: refs/heads/develop
Commit: 6b2b7b2f7f3f63b8ae638e9afffa5edc0f763783
Parents: 391502a
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 15:13:01 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700
----------------------------------------------------------------------
.../apache/geode/internal/tcp/Connection.java | 188 ++++++-------------
.../geode/internal/tcp/ConnectionTable.java | 68 ++-----
.../geode/internal/tcp/DirectReplySender.java | 2 +-
.../apache/geode/internal/tcp/MsgReader.java | 2 +-
.../apache/geode/internal/tcp/NIOMsgReader.java | 2 +-
5 files changed, 82 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index a0af245..c57a0ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -72,8 +72,6 @@ public class Connection implements Runnable {
public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg
public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks
public final static int DIRECT_ACK_BIT = 0x20;
- // We no longer support early ack
- // public final static int EARLY_ACK_BIT = 0x10;
public static final int MSG_HEADER_SIZE_OFFSET = 0;
public static final int MSG_HEADER_TYPE_OFFSET = 4;
@@ -95,7 +93,9 @@ public class Connection implements Runnable {
"member unexpectedly shut down shared, unordered connection";
/** the table holding this connection */
- final ConnectionTable owner;
+ private final ConnectionTable owner;
+
+ private final TCPConduit conduit;
/**
* Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader
@@ -113,15 +113,6 @@ public class Connection implements Runnable {
/** The idle timeout timer task for this connection */
private SystemTimerTask idleTask;
- /**
- * Returns the depth of unshared reader threads from this thread to the original
- * non-reader-thread. E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) ->
- * reader(domino=3)
- */
- public static int getDominoCount() {
- return dominoCount.get().intValue();
- }
-
private final static ThreadLocal isReaderThread = new ThreadLocal();
public final static void makeReaderThread() {
@@ -129,7 +120,7 @@ public class Connection implements Runnable {
makeReaderThread(true);
}
- private final static void makeReaderThread(boolean v) {
+ private static void makeReaderThread(boolean v) {
isReaderThread.set(v);
}
@@ -150,7 +141,7 @@ public class Connection implements Runnable {
if (connectTimeoutStr != null) {
P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
} else {
- P2P_CONNECT_TIMEOUT = 6 * this.owner.owner.getDM().getConfig().getMemberTimeout();
+ P2P_CONNECT_TIMEOUT = 6 * this.conduit.getDM().getConfig().getMemberTimeout();
}
IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
return P2P_CONNECT_TIMEOUT;
@@ -367,20 +358,18 @@ public class Connection implements Runnable {
/** the buffer used for NIO message receipt */
ByteBuffer nioInputBuffer;
- /** the position of the next message's content */
- // int nioMessageStart;
-
/** the length of the next message to be dispatched */
int nioMessageLength;
- // byte nioMessageVersion;
/** the type of message being received */
byte nioMessageType;
/** used to lock access to destreamer data */
private final Object destreamerLock = new Object();
+
/** caches a msg destreamer that is currently not being used */
MsgDestreamer idleMsgDestreamer;
+
/**
* used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages using
* nio
@@ -409,8 +398,6 @@ public class Connection implements Runnable {
private int sendBufferSize = -1;
private int recvBufferSize = -1;
- private ReplySender replySender;
-
private void setSendBufferSize(Socket sock) {
setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
}
@@ -541,6 +528,7 @@ public class Connection implements Runnable {
throw new IllegalArgumentException(
LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString());
}
+ this.conduit = t.getConduit();
this.isReceiver = true;
this.owner = t;
this.socket = socket;
@@ -628,7 +616,7 @@ public class Connection implements Runnable {
bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff);
bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
- bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff);
+ bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
int allocSize = bytes.length;
@@ -707,19 +695,16 @@ public class Connection implements Runnable {
my_okHandshakeBytes = okHandshakeBytes;
}
if (useNIO()) {
+ assert my_okHandshakeBuf != null;
synchronized (my_okHandshakeBuf) {
my_okHandshakeBuf.position(0);
nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null);
}
} else {
synchronized (outLock) {
- try {
- // this.writerThread = Thread.currentThread();
- this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
- this.output.flush();
- } finally {
- // this.writerThread = null;
- }
+ assert my_okHandshakeBytes != null;
+ this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
+ this.output.flush();
}
}
}
@@ -832,7 +817,7 @@ public class Connection implements Runnable {
/**
* asynchronously close this connection
*
- * @param beingSick
+ * @param beingSick test hook to simulate sickness in communications & membership
*/
private void asyncClose(boolean beingSick) {
// note: remoteAddr may be null if this is a receiver that hasn't finished its handshake
@@ -890,8 +875,7 @@ public class Connection implements Runnable {
InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
- // connectHandshake.reset();
- /**
+ /*
* Note a byte of zero is always written because old products serialized a member id with always
* sends an ip address. My reading of the ip-address specs indicated that the first byte of a
* valid address would never be 0.
@@ -925,8 +909,6 @@ public class Connection implements Runnable {
private void handshakeStream() throws IOException {
waitForAddressCompletion();
- // this.output = new BufferedOutputStream(getSocket().getOutputStream(),
- // owner.getConduit().bufferSize);
this.output = getSocket().getOutputStream();
ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
DataOutputStream os = new DataOutputStream(baos);
@@ -961,17 +943,12 @@ public class Connection implements Runnable {
lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff);
lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff);
lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE;
- lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff);
+ lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff);
lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
synchronized (outLock) {
- try {
- // this.writerThread = Thread.currentThread();
- this.output.write(lenbytes, 0, lenbytes.length);
- this.output.write(msg, 0, msg.length);
- this.output.flush();
- } finally {
- // this.writerThread = null;
- }
+ this.output.write(lenbytes, 0, lenbytes.length);
+ this.output.write(msg, 0, msg.length);
+ this.output.flush();
}
}
@@ -1091,7 +1068,7 @@ public class Connection implements Runnable {
// create connection
try {
conn = null;
- conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource);
+ conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
} catch (javax.net.ssl.SSLHandshakeException se) {
// no need to retry if certificates were rejected
throw se;
@@ -1206,7 +1183,7 @@ public class Connection implements Runnable {
private void setRemoteAddr(DistributedMember m) {
this.remoteAddr = this.owner.getDM().getCanonicalId(m);
- MembershipManager mgr = this.owner.owner.getMembershipManager();
+ MembershipManager mgr = this.conduit.getMembershipManager();
mgr.addSurpriseMember(m);
}
@@ -1214,9 +1191,8 @@ public class Connection implements Runnable {
* creates a new connection to a remote server. We are initiating this connection; the other side
* must accept us We will almost always send messages; small acks are received.
*/
- private Connection(MembershipManager mgr, ConnectionTable t, boolean preserveOrder,
- DistributedMember remoteID, boolean sharedResource)
- throws IOException, DistributedSystemDisconnectedException {
+ private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID,
+ boolean sharedResource) throws IOException, DistributedSystemDisconnectedException {
// initialize a socket upfront. So that the
InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID;
@@ -1224,6 +1200,7 @@ public class Connection implements Runnable {
throw new IllegalArgumentException(
LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
}
+ this.conduit = t.getConduit();
this.isReceiver = false;
this.owner = t;
this.sharedResource = sharedResource;
@@ -1248,7 +1225,7 @@ public class Connection implements Runnable {
channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
- /**
+ /*
* If conserve-sockets is false, the socket can be used for receiving responses, so set the
* receive buffer accordingly.
*/
@@ -1261,7 +1238,7 @@ public class Connection implements Runnable {
setSendBufferSize(channel.socket());
channel.configureBlocking(true);
- int connectTime = getP2PConnectTimeout();;
+ int connectTime = getP2PConnectTimeout();
try {
channel.socket().connect(addr, connectTime);
@@ -1276,7 +1253,7 @@ public class Connection implements Runnable {
Thread.currentThread().interrupt();
}
throw c;
- } catch (CancelledKeyException e) {
+ } catch (CancelledKeyException | ClosedSelectorException e) {
// bug #44469: for some reason NIO throws this runtime exception
// instead of an IOException on timeouts
ConnectException c = new ConnectException(
@@ -1284,14 +1261,6 @@ public class Connection implements Runnable {
.toLocalizedString(new Object[] {connectTime}));
c.initCause(e);
throw c;
- } catch (ClosedSelectorException e) {
- // bug #44808: for some reason JRockit NIO thorws this runtime exception
- // instead of an IOException on timeouts
- ConnectException c = new ConnectException(
- LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
- .toLocalizedString(new Object[] {connectTime}));
- c.initCause(e);
- throw c;
}
} finally {
this.owner.removeConnectingSocket(channel.socket());
@@ -1309,7 +1278,6 @@ public class Connection implements Runnable {
setSocketBufferSize(this.socket, false, socketBufferSize, true);
setSendBufferSize(this.socket);
} else {
- // socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort());
Socket s = new Socket();
this.socket = s;
s.setTcpNoDelay(true);
@@ -1335,13 +1303,12 @@ public class Connection implements Runnable {
* must not be doing it correctly.
*/
private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends");
- protected static final int BATCH_BUFFER_SIZE =
+ private static final int BATCH_BUFFER_SIZE =
Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue();
- protected static final int BATCH_FLUSH_MS =
- Integer.getInteger("p2p.batchFlushTime", 50).intValue();
- protected Object batchLock;
- protected ByteBuffer fillBatchBuffer;
- protected ByteBuffer sendBatchBuffer;
+ private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue();
+ private Object batchLock;
+ private ByteBuffer fillBatchBuffer;
+ private ByteBuffer sendBatchBuffer;
private BatchBufferFlusher batchFlusher;
private void createBatchSendBuffer() {
@@ -1446,13 +1413,7 @@ public class Connection implements Runnable {
SocketChannel channel = getSocket().getChannel();
nioWriteFully(channel, sendBatchBuffer, false, null);
sendBatchBuffer.clear();
- } catch (IOException ex) {
- logger.fatal(LocalizedMessage.create(
- LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
- readerShuttingDown = true;
- requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0
- .toLocalizedString(ex));
- } catch (ConnectionException ex) {
+ } catch (IOException | ConnectionException ex) {
logger.fatal(LocalizedMessage.create(
LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
readerShuttingDown = true;
@@ -1526,13 +1487,6 @@ public class Connection implements Runnable {
return this.closing.get();
}
- /**
- * Used to close a connection that has not yet been registered with the distribution manager.
- */
- void closePartialConnect(String reason) {
- close(reason, false, false, false, false);
- }
-
void closePartialConnect(String reason, boolean beingSick) {
close(reason, false, false, beingSick, false);
}
@@ -1619,9 +1573,9 @@ public class Connection implements Runnable {
// if network partition detection is enabled or this is an admin vm
// we can't wait for the reader thread when running in an IBM JRE. See
// bug 41889
- if (this.owner.owner.config.getEnableNetworkPartitionDetection()
- || this.owner.owner.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE
- || this.owner.owner.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
+ if (this.conduit.config.getEnableNetworkPartitionDetection()
+ || this.conduit.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE
+ || this.conduit.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
}
{
@@ -1673,6 +1627,7 @@ public class Connection implements Runnable {
}
}
} else {
+ // noinspection ConstantConditions
this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this);
}
} else if (!this.isReceiver) {
@@ -1735,7 +1690,7 @@ public class Connection implements Runnable {
initiateSuspicionIfSharedUnordered();
if (this.isReceiver) {
if (!this.sharedResource) {
- this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
+ this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
}
asyncClose(false);
this.owner.removeAndCloseThreadOwnedSockets();
@@ -1759,7 +1714,7 @@ public class Connection implements Runnable {
}
private String p2pReaderName() {
- StringBuffer sb = new StringBuffer(64);
+ StringBuilder sb = new StringBuilder(64);
if (this.isReceiver) {
sb.append("P2P message reader@");
} else {
@@ -1973,8 +1928,8 @@ public class Connection implements Runnable {
}
msg = msg.toLowerCase();
- return (msg.indexOf("forcibly closed") >= 0) || (msg.indexOf("reset by peer") >= 0)
- || (msg.indexOf("connection reset") >= 0);
+ return (msg.contains("forcibly closed")) || (msg.contains("reset by peer"))
+ || (msg.contains("connection reset"));
}
private static boolean validMsgType(int msgType) {
@@ -2012,7 +1967,7 @@ public class Connection implements Runnable {
this.idleMsgDestreamer = null;
} else {
result = new MsgDestreamer(this.owner.getConduit().stats,
- this.owner.owner.getCancelCriterion(), v);
+ this.conduit.getCancelCriterion(), v);
}
result.setName(p2pReaderName() + " msgId=" + msgId);
this.destreamerMap.put(key, result);
@@ -2103,7 +2058,7 @@ public class Connection implements Runnable {
/* byte msgHdrVersion = */ calcHdrVersion(len);
len = calcMsgByteSize(len);
int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
- short msgId = (short) ((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff * 0x100)
+ short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
+ (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
if (myDirectAck) {
@@ -2384,7 +2339,7 @@ public class Connection implements Runnable {
// logger.fine("thread-owned receiver with domino count of " + dominoNumber + "
// will prefer shared sockets");
}
- this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber);
+ this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
}
if (logger.isDebugEnabled()) {
@@ -2541,11 +2496,6 @@ public class Connection implements Runnable {
}
bytesSoFar += bytesThisTime;
} catch (InterruptedIOException io) {
- // try { Thread.sleep(10); }
- // catch (InterruptedException ie) {
- // Thread.currentThread().interrupt();
- // }
-
// Current thread has been interrupted. Regard it similar to an EOF
this.readerShuttingDown = true;
try {
@@ -2582,7 +2532,7 @@ public class Connection implements Runnable {
final boolean origSocketInUse = this.socketInUse;
byte originalState = -1;
synchronized (stateLock) {
- originalState = this.connectionState;;
+ originalState = this.connectionState;
this.connectionState = STATE_SENDING;
}
this.socketInUse = true;
@@ -2597,13 +2547,8 @@ public class Connection implements Runnable {
} else {
byte[] bytesToWrite = getBytesToWrite(buffer);
synchronized (outLock) {
- try {
- // this.writerThread = Thread.currentThread();
- this.output.write(bytesToWrite);
- this.output.flush();
- } finally {
- // this.writerThread = null;
- }
+ this.output.write(bytesToWrite);
+ this.output.flush();
}
}
}
@@ -2763,15 +2708,7 @@ public class Connection implements Runnable {
return bytesToWrite;
}
- // private String socketInfo() {
- // return (" socket: " + getSocket().getLocalAddress() + ":" + getSocket().getLocalPort() + " -> "
- // +
- // getSocket().getInetAddress() + ":" + getSocket().getPort() + " connection = " +
- // System.identityHashCode(this));
- //
- // }
-
- private final boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
+ private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force)
throws ConnectionException {
final DMStats stats = this.owner.getConduit().stats;
long start = DistributionStats.getStatTime();
@@ -2891,7 +2828,7 @@ public class Connection implements Runnable {
*
* @throws ConnectionException if the conduit has stopped
*/
- private final boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg)
+ private boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg)
throws ConnectionException {
if (!addToQueue(buffer, msg, true)) {
return false;
@@ -2931,7 +2868,7 @@ public class Connection implements Runnable {
this.pusherThread.start();
}
- private final ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
+ private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
ByteBuffer result = null;
final DMStats stats = this.owner.getConduit().stats;
long start = DistributionStats.getStatTime();
@@ -3152,7 +3089,7 @@ public class Connection implements Runnable {
* Return false if socket writes to be done async/nonblocking Return true if socket writes to be
* done sync/blocking
*/
- private final boolean useSyncWrites(boolean forceAsync) {
+ private boolean useSyncWrites(boolean forceAsync) {
if (forceAsync) {
return false;
}
@@ -3185,7 +3122,7 @@ public class Connection implements Runnable {
static private final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 2)
- private final void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
+ private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync,
DistributionMessage p_msg, final DMStats stats) throws IOException {
DistributionMessage msg = p_msg;
// async/non-blocking
@@ -3301,7 +3238,7 @@ public class Connection implements Runnable {
if (msToWait <= 0) {
Thread.yield();
} else {
- boolean interrupted = Thread.interrupted();;
+ boolean interrupted = Thread.interrupted();
try {
Thread.sleep(msToWait);
} catch (InterruptedException ex) {
@@ -3401,10 +3338,10 @@ public class Connection implements Runnable {
/**
* stateLock is used to synchronize state changes.
*/
- protected Object stateLock = new Object();
+ private final Object stateLock = new Object();
/** for timeout processing, this is the current state of the connection */
- protected byte connectionState = STATE_IDLE;
+ private byte connectionState = STATE_IDLE;
/* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
/** the connection is idle, but may be in use */
@@ -3420,16 +3357,11 @@ public class Connection implements Runnable {
/** the connection is in use and is reading a message */
protected static final byte STATE_READING = 5;
- protected static final String[] STATE_NAMES =
- new String[] {"idle", "sending", "post_sending", "reading_ack", "received_ack", "reading"};
/* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
/** set to true if we exceeded the ack-wait-threshold waiting for a response */
protected volatile boolean ackTimedOut;
- private static int ACK_SIZE = 1;
- private static byte ACK_BYTE = 37;
-
/**
* @param msToWait number of milliseconds to wait for an ack. If 0 then wait forever.
* @param msInterval interval between checks
@@ -3899,7 +3831,7 @@ public class Connection implements Runnable {
// } else {
// ConnectionTable.threadWantsSharedResources();
}
- this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber);
+ this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
// Because this thread is not shared resource, it will be used for direct
// ack. Direct ack messages can be large. This call will resize the send
// buffer.
@@ -4039,6 +3971,10 @@ public class Connection implements Runnable {
}
}
+ protected TCPConduit getConduit() {
+ return this.conduit;
+ }
+
protected Socket getSocket() throws SocketException {
// fix for bug 37286
Socket result = this.socket;
@@ -4177,7 +4113,7 @@ public class Connection implements Runnable {
boolean nioChecked;
boolean useNIO;
- private final boolean useNIO() {
+ private boolean useNIO() {
if (TCPConduit.useSSL) {
return false;
}
@@ -4193,7 +4129,7 @@ public class Connection implements Runnable {
if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) {
String os = System.getProperty("os.name");
if (os != null) {
- if (os.indexOf("Windows") != -1) {
+ if (os.contains("Windows")) {
this.useNIO = false;
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 08a9009..c55af82 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -64,17 +64,9 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
*
* @since GemFire 2.1
*/
-/*
- * Note: We no longer use InputMultiplexer If InputMux is reinstated then the manager needs to be
- * initialized and all lines that have a NOMUX preface should be uncommented
- *
- */
public class ConnectionTable {
private static final Logger logger = LogService.getLogger();
- /** a random number generator for secondary connection selection */
- // static java.util.Random random = new java.util.Random();
-
/** warning when descriptor limit reached */
private static boolean ulimitWarningIssued;
@@ -82,6 +74,7 @@ public class ConnectionTable {
* true if the current thread wants non-shared resources
*/
private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
+
/**
* Used for messages whose order must be preserved Only connections used for sending messages, and
* receiving acks, will be put in this map.
@@ -132,9 +125,7 @@ public class ConnectionTable {
/**
* the conduit for this table
*/
- protected final TCPConduit owner;
- // ARB: temp making this protected to provide access to Connection.
- // private final TCPConduit owner;
+ private final TCPConduit owner;
/**
* true if this table is no longer in use
@@ -205,17 +196,10 @@ public class ConnectionTable {
return (Boolean) threadWantsOwnResources.get();
}
- // public static void setThreadOwnsResourcesRegistration(
- // Boolean newValue) {
- // threadWantsOwnResources.set(newValue);
- // }
- // private Map connections = new HashMap();
- /* NOMUX: private InputMuxManager inputMuxManager; */
- // private int lowWater;
- // private int highWater;
+ public TCPConduit getOwner() {
+ return owner;
+ }
- // private static boolean TRACK_SERVER_CONNECTIONS =
- // System.getProperty("p2p.bidirectional", "true").equals("true");
private ConnectionTable(TCPConduit c) throws IOException {
this.owner = c;
@@ -226,10 +210,6 @@ public class ConnectionTable {
this.threadConnectionMap = new ConcurrentHashMap();
this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
this.socketCloser = new SocketCloser();
- /*
- * NOMUX: if (TCPConduit.useNIO) { inputMuxManager = new InputMuxManager(this);
- * inputMuxManager.start(c.logger); }
- */
}
private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -306,7 +286,6 @@ public class ConnectionTable {
}
}
- // Stub id = conn.getRemoteId();
if (conn != null) {
synchronized (this.receivers) {
this.owner.stats.incReceivers();
@@ -322,22 +301,9 @@ public class ConnectionTable {
conn.remoteAddr);
}
}
- // cleanupHighWater();
}
- // /** returns the connection associated with the given key, or null if
- // no such connection exists */
- // protected Connection basicGet(Serializable id) {
- // synchronized (this.orderedConnectionMap) {
- // return (Connection) this.orderedConnectionMap.get(id);
- // }
- // }
-
- // protected Connection get(Serializable id) throws java.io.IOException {
- // return get(id, false);
- // }
-
/**
* Process a newly created PendingConnection
@@ -432,10 +398,10 @@ public class ConnectionTable {
}
/**
- * unordered or conserve-sockets note that unordered connections are currently always shared
+ * unordered or conserve-sockets=true note that unordered connections are currently always shared
*
* @param id the DistributedMember on which we are creating a connection
- * @param threadOwnsResources whether unordered conn is owned by the current thread
+ * @param scheduleTimeout whether unordered connection should time out
* @param preserveOrder whether to preserve order
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
@@ -444,9 +410,9 @@ public class ConnectionTable {
* @throws IOException if unable to create the connection
* @throws DistributedSystemDisconnectedException
*/
- private Connection getUnorderedOrConserveSockets(DistributedMember id,
- boolean threadOwnsResources, boolean preserveOrder, long startTime, long ackTimeout,
- long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
+ private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout,
+ boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout)
+ throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap;
@@ -472,7 +438,7 @@ public class ConnectionTable {
if (pc != null) {
result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc,
startTime, ackTimeout, ackSATimeout);
- if (!preserveOrder && threadOwnsResources) {
+ if (!preserveOrder && scheduleTimeout) {
scheduleIdleTimeout(result);
}
} else { // we have existing connection
@@ -487,10 +453,10 @@ public class ConnectionTable {
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
if (result != null) {
- logger.debug("getUnorderedOrConserveSockets {} myAddr={} theirAddr={}", result,
+ logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result,
getConduit().getMemberId(), result.remoteAddr);
} else {
- logger.debug("getUnorderedOrConserveSockets: Connect failed");
+ logger.debug("getSharedConnection: Connect failed");
}
}
} else {
@@ -512,7 +478,7 @@ public class ConnectionTable {
* @throws IOException if the connection could not be created
* @throws DistributedSystemDisconnectedException
*/
- Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout,
+ Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout,
long ackSATimeout) throws IOException, DistributedSystemDisconnectedException {
Connection result = null;
@@ -658,10 +624,10 @@ public class ConnectionTable {
Connection result = null;
boolean threadOwnsResources = threadOwnsResources();
if (!preserveOrder || !threadOwnsResources) {
- result = getUnorderedOrConserveSockets(id, threadOwnsResources, preserveOrder, startTime,
- ackTimeout, ackSATimeout);
+ result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout,
+ ackSATimeout);
} else {
- result = getOrderedAndOwned(id, startTime, ackTimeout, ackSATimeout);
+ result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout);
}
if (result != null) {
Assert.assertTrue(result.preserveOrder == preserveOrder);
http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 3872ee9..bf06953 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -56,7 +56,7 @@ class DirectReplySender implements ReplySender {
// mutates the list when it has exceptions.
// fix for bug #42199 - cancellation check
- this.conn.owner.getDM().getCancelCriterion().checkCancelInProgress(null);
+ this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "Sending a direct reply {} to {}", msg, conn.getRemoteAddress());
http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index fc56271..be1f533 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -94,7 +94,7 @@ public abstract class MsgReader {
public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
protected DMStats getStats() {
- return conn.owner.getConduit().stats;
+ return conn.getConduit().stats;
}
public static class Header {
http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
index 50f5fae..a4e35a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
@@ -79,7 +79,7 @@ public class NIOMsgReader extends MsgReader {
if (nioInputBuffer == null) {
int allocSize = conn.getReceiveBufferSize();
if (allocSize == -1) {
- allocSize = conn.owner.getConduit().tcpBufferSize;
+ allocSize = conn.getConduit().tcpBufferSize;
}
if (allocSize > bufferSize) {
bufferSize = allocSize;
[2/2] geode git commit: GEODE-2732 after auto-reconnect a server is
restarted on the default port
Posted by bs...@apache.org.
GEODE-2732 after auto-reconnect a server is restarted on the default port
Gfsh command line parameters were put into ThreadLocals to make them
available to the XML parser. These are now held in non-thread-local
variables so that all threads, including the auto-reconnect thread,
can see them when building the cache.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/391502a2
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/391502a2
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/391502a2
Branch: refs/heads/develop
Commit: 391502a2615dbc80cde0b9a111fa967f4d76c39a
Parents: 39c72b2
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Apr 5 15:11:04 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700
----------------------------------------------------------------------
.../geode/distributed/ServerLauncher.java | 10 +-
.../internal/cache/CacheServerLauncher.java | 43 +++++---
.../internal/cache/xmlcache/CacheCreation.java | 2 +-
.../cache30/ReconnectWithCacheXMLDUnitTest.java | 107 +++++++++++++++++++
.../ReconnectWithUDPSecurityDUnitTest.java | 6 ++
.../ReconnectedCacheServerDUnitTest.java | 4 +-
.../cache30/ReconnectWithCacheXMLDUnitTest.xml | 25 +++++
7 files changed, 176 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
index 9435bd8..c96732c 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/ServerLauncher.java
@@ -274,7 +274,7 @@ public class ServerLauncher extends AbstractLauncher<String> {
this.assignBuckets = Boolean.TRUE.equals(builder.getAssignBuckets());
setDebug(Boolean.TRUE.equals(builder.getDebug()));
this.disableDefaultServer = Boolean.TRUE.equals(builder.getDisableDefaultServer());
- CacheServerLauncher.disableDefaultServer.set(this.disableDefaultServer);
+ CacheServerLauncher.setDisableDefaultServer(this.disableDefaultServer);
this.distributedSystemProperties = builder.getDistributedSystemProperties();
this.force = Boolean.TRUE.equals(builder.getForce());
this.help = Boolean.TRUE.equals(builder.getHelp());
@@ -286,11 +286,11 @@ public class ServerLauncher extends AbstractLauncher<String> {
this.redirectOutput = Boolean.TRUE.equals(builder.getRedirectOutput());
this.serverBindAddress = builder.getServerBindAddress();
if (builder.isServerBindAddressSetByUser() && this.serverBindAddress != null) {
- CacheServerLauncher.serverBindAddress.set(this.serverBindAddress.getHostAddress());
+ CacheServerLauncher.setServerBindAddress(this.serverBindAddress.getHostAddress());
}
this.serverPort = builder.getServerPort();
if (builder.isServerPortSetByUser() && this.serverPort != null) {
- CacheServerLauncher.serverPort.set(this.serverPort);
+ CacheServerLauncher.setServerPort(this.serverPort);
}
this.springXmlLocation = builder.getSpringXmlLocation();
this.workingDirectory = builder.getWorkingDirectory();
@@ -954,8 +954,8 @@ public class ServerLauncher extends AbstractLauncher<String> {
final String serverBindAddress =
(getServerBindAddress() == null ? null : getServerBindAddress().getHostAddress());
final Integer serverPort = getServerPort();
- CacheServerLauncher.serverBindAddress.set(serverBindAddress);
- CacheServerLauncher.serverPort.set(serverPort);
+ CacheServerLauncher.setServerBindAddress(serverBindAddress);
+ CacheServerLauncher.setServerPort(serverPort);
final CacheServer cacheServer = cache.addCacheServer();
cacheServer.setBindAddress(serverBindAddress);
cacheServer.setPort(serverPort);
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
index 760abd3..9a544d2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerLauncher.java
@@ -565,28 +565,43 @@ public class CacheServerLauncher {
}
}
- public static ThreadLocal<Integer> serverPort = new ThreadLocal<Integer>();
+ private static Integer serverPort;
+
+ private static String serverBindAddress;
+
+ public static void setServerPort(Integer serverPort) {
+ CacheServerLauncher.serverPort = serverPort;
+ }
+
+ public static void setServerBindAddress(String serverBindAddress) {
+ CacheServerLauncher.serverBindAddress = serverBindAddress;
+ }
+
+ public static void setDisableDefaultServer(Boolean disableDefaultServer) {
+ CacheServerLauncher.disableDefaultServer = disableDefaultServer;
+ }
+
+ public static Boolean disableDefaultServer;
+
- public static ThreadLocal<String> serverBindAddress = new ThreadLocal<String>();
public static Integer getServerPort() {
- return serverPort.get();
+ return serverPort;
}
public static String getServerBindAddress() {
- return serverBindAddress.get();
+ return serverBindAddress;
}
- public static ThreadLocal<Boolean> disableDefaultServer = new ThreadLocal<Boolean>();
-
public static Boolean getDisableDefaultServer() {
- return disableDefaultServer.get();
+ return disableDefaultServer;
}
+
public static void clearStatics() {
- disableDefaultServer.set(null);
- serverPort.set(null);
- serverBindAddress.set(null);
+ disableDefaultServer = null;
+ serverPort = null;
+ serverBindAddress = null;
}
@@ -616,11 +631,11 @@ public class CacheServerLauncher {
final String serverPortString = (String) options.get(SERVER_PORT);
if (serverPortString != null) {
- serverPort.set(Integer.parseInt(serverPortString));
+ serverPort = Integer.parseInt(serverPortString);
}
- serverBindAddress.set((String) options.get(SERVER_BIND_ADDRESS_NAME));
- disableDefaultServer.set((Boolean) options.get(DISABLE_DEFAULT_SERVER));
+ serverBindAddress = (String) options.get(SERVER_BIND_ADDRESS_NAME);
+ disableDefaultServer = (Boolean) options.get(DISABLE_DEFAULT_SERVER);
workingDir = new File(System.getProperty("user.dir"));
// Say that we're starting...
@@ -835,7 +850,7 @@ public class CacheServerLauncher {
// Create and start a default cache server
// If (disableDefaultServer is not set or it is set but false) AND (the number of cacheservers
// is 0)
- Boolean disable = disableDefaultServer.get();
+ Boolean disable = disableDefaultServer;
if ((disable == null || !disable) && cache.getCacheServers().size() == 0) {
// Create and add a cache server
CacheServer server = cache.addCacheServer();
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index 1c3c933..a0810d9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -503,7 +503,7 @@ public class CacheCreation implements InternalCache {
Integer serverPort = CacheServerLauncher.getServerPort();
String serverBindAdd = CacheServerLauncher.getServerBindAddress();
- Boolean disableDefaultServer = CacheServerLauncher.disableDefaultServer.get();
+ Boolean disableDefaultServer = CacheServerLauncher.getDisableDefaultServer();
startCacheServers(this.getCacheServers(), cache, serverPort, serverBindAdd,
disableDefaultServer);
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
new file mode 100755
index 0000000..4f2fac1
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache30;
+
+import static org.apache.geode.internal.Assert.assertTrue;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.membership.MembershipTestHook;
+import org.apache.geode.distributed.internal.membership.gms.MembershipManagerHelper;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheServerLauncher;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.apache.geode.util.test.TestUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
+public class ReconnectWithCacheXMLDUnitTest extends JUnit4CacheTestCase {
+
+
+ public ReconnectWithCacheXMLDUnitTest() {
+ super();
+ }
+
+ private static final long serialVersionUID = 1L;
+
+
+ private String xmlProperty = DistributionConfig.GEMFIRE_PREFIX + "autoReconnect-useCacheXMLFile";
+ private String oldPropertySetting;
+
+ @Override
+ public final void postSetUp() {
+ oldPropertySetting = System.setProperty(xmlProperty, "true");
+ }
+
+ @Override
+ public final void preTearDownCacheTestCase() throws Exception {
+ if (oldPropertySetting == null) {
+ System.getProperties().remove(xmlProperty);
+ } else {
+ System.setProperty(xmlProperty, oldPropertySetting);
+ }
+ }
+
+ @Override
+ public Properties getDistributedSystemProperties() {
+ Properties result = super.getDistributedSystemProperties();
+ String fileName = TestUtil.getResourcePath(getClass(), "ReconnectWithCacheXMLDUnitTest.xml");
+ result.put(ConfigurationProperties.CACHE_XML_FILE, fileName);
+ result.put(ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION, "true");
+ result.put(ConfigurationProperties.DISABLE_AUTO_RECONNECT, "false");
+ result.put(ConfigurationProperties.MAX_WAIT_TIME_RECONNECT, "2000");
+ return result;
+ }
+
+ @Test
+ public void testCacheServerLauncherPortRetained() throws Exception {
+ CacheServerLauncher.setDisableDefaultServer(true);
+ CacheServerLauncher.setServerPort(AvailablePortHelper.getRandomAvailableTCPPort());
+ Cache cache = getCache();
+
+ final AtomicBoolean membershipFailed = new AtomicBoolean();
+ MembershipManagerHelper.addTestHook(cache.getDistributedSystem(), new MembershipTestHook() {
+ @Override
+ public void beforeMembershipFailure(String reason, Throwable cause) {
+ membershipFailed.set(true);
+ }
+
+ @Override
+ public void afterMembershipFailure(String reason, Throwable cause) {}
+ });
+ MembershipManagerHelper.crashDistributedSystem(cache.getDistributedSystem());
+ assertTrue(membershipFailed.get());
+
+ await().atMost(60, TimeUnit.SECONDS).until(() -> cache.getReconnectedCache() != null);
+
+ Cache newCache = cache.getReconnectedCache();
+ CacheServer server = newCache.getCacheServers().iterator().next();
+ assertEquals(CacheServerLauncher.getServerPort().intValue(), server.getPort());
+ assertEquals(20, server.getMaxConnections()); // this setting is in the XML file
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
index a52d8bf..55d0a3c 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectWithUDPSecurityDUnitTest.java
@@ -17,8 +17,14 @@ package org.apache.geode.cache30;
import java.util.Properties;
import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
+import org.junit.experimental.categories.Category;
+
import static org.apache.geode.distributed.ConfigurationProperties.*;
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
public class ReconnectWithUDPSecurityDUnitTest extends ReconnectDUnitTest {
public ReconnectWithUDPSecurityDUnitTest() {
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
index 3224257..2a2fe73 100755
--- a/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ReconnectedCacheServerDUnitTest.java
@@ -14,6 +14,8 @@
*/
package org.apache.geode.cache30;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.categories.MembershipTest;
import org.junit.experimental.categories.Category;
import org.junit.Test;
@@ -32,7 +34,7 @@ import org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipMan
import org.apache.geode.internal.cache.GemFireCacheImpl;
-@Category(DistributedTest.class)
+@Category({DistributedTest.class, MembershipTest.class, ClientServerTest.class})
public class ReconnectedCacheServerDUnitTest extends JUnit4CacheTestCase {
public ReconnectedCacheServerDUnitTest() {
http://git-wip-us.apache.org/repos/asf/geode/blob/391502a2/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
----------------------------------------------------------------------
diff --git a/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml b/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
new file mode 100644
index 0000000..f7e338b
--- /dev/null
+++ b/geode-core/src/test/resources/org/apache/geode/cache30/ReconnectWithCacheXMLDUnitTest.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<cache
+ xmlns="http://geode.apache.org/schema/cache"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://geode.apache.org/schema/cache http://geode.apache.org/schema/cache/cache-1.0.xsd"
+ version="1.0">
+ <cache-server max-connections="20"/>
+</cache>
+