You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/07/12 20:01:02 UTC
[3/8] incubator-geode git commit: GEODE-420: Clean up of
SocketCreator code in tests. SocketCreatorFactory currently singleton,
to amend at later stage
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
index ed570c1..966de54 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java
@@ -16,66 +16,90 @@
*/
package com.gemstone.gemfire.internal.tcp;
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.SSLException;
+
+import org.apache.logging.log4j.Logger;
+
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
-import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.LonerDistributionManager;
import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
-import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import org.apache.logging.log4j.Logger;
-
-import javax.net.ssl.SSLException;
-import java.io.IOException;
-import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.*;
-
-/** <p>TCPConduit manages a server socket and a collection of connections to
- other systems. Connections are identified by DistributedMember IDs.
- These types of messages are currently supported:</p><pre>
-
- DistributionMessage - message is delivered to the server's
- ServerDelegate
-
- </pre>
- <p>In the current implementation, ServerDelegate is the DirectChannel
- used by the GemFire DistributionManager to send and receive messages.<p>
- If the ServerDelegate is null, DistributionMessages are ignored by
- the TCPConduit.</p>
-
- @since GemFire 2.0
-
-*/
+import com.gemstone.gemfire.internal.net.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
+
+/**
+ * <p>TCPConduit manages a server socket and a collection of connections to
+ * other systems. Connections are identified by DistributedMember IDs.
+ * These types of messages are currently supported:</p><pre>
+ * <p>
+ * DistributionMessage - message is delivered to the server's
+ * ServerDelegate
+ * <p>
+ * </pre>
+ * <p>In the current implementation, ServerDelegate is the DirectChannel
+ * used by the GemFire DistributionManager to send and receive messages.<p>
+ * If the ServerDelegate is null, DistributionMessages are ignored by
+ * the TCPConduit.</p>
+ * @since GemFire 2.0
+ */
public class TCPConduit implements Runnable {
+
private static final Logger logger = LogService.getLogger();
- /** max amount of time (ms) to wait for listener threads to stop */
+ /**
+ * max amount of time (ms) to wait for listener threads to stop
+ */
private static int LISTENER_CLOSE_TIMEOUT;
- /** backlog is the "accept" backlog configuration parameter all
- conduits server socket */
+ /**
+ * backlog is the "accept" backlog configuration parameter all
+ * conduits server socket
+ */
private static int BACKLOG;
-
- /** use javax.net.ssl.SSLServerSocketFactory? */
+
+ /**
+ * use javax.net.ssl.SSLServerSocketFactory?
+ */
static boolean useSSL;
-// public final static boolean USE_SYNC_WRITES = Boolean.getBoolean("p2p.useSyncWrites");
+ // public final static boolean USE_SYNC_WRITES = Boolean.getBoolean("p2p.useSyncWrites");
/**
* Force use of Sockets rather than SocketChannels (NIO). Note from Bruce: due to
@@ -84,18 +108,22 @@ public class TCPConduit implements Runnable {
*/
private static boolean USE_NIO;
- /** use direct ByteBuffers instead of heap ByteBuffers for NIO operations */
+ /**
+ * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
+ */
static boolean useDirectBuffers;
-
+
private volatile boolean inhibitNewConnections;
-// private transient DistributedMembershipListener messageReceiver;
-
+ // private transient DistributedMembershipListener messageReceiver;
+
private MembershipManager membershipManager;
-
- /** true if NIO can be used for the server socket */
+
+ /**
+ * true if NIO can be used for the server socket
+ */
private boolean useNIO;
-
+
static {
init();
}
@@ -107,7 +135,7 @@ public class TCPConduit implements Runnable {
public static int getBackLog() {
return BACKLOG;
}
-
+
public static void init() {
useSSL = Boolean.getBoolean("p2p.useSSL");
// only use nio if not SSL
@@ -116,33 +144,45 @@ public class TCPConduit implements Runnable {
useDirectBuffers = USE_NIO && !Boolean.getBoolean("p2p.nodirectBuffers");
LISTENER_CLOSE_TIMEOUT = Integer.getInteger("p2p.listenerCloseTimeout", 60000).intValue();
// fix for bug 37730
- BACKLOG = Integer.getInteger("p2p.backlog", HANDSHAKE_POOL_SIZE+1).intValue();
+ BACKLOG = Integer.getInteger("p2p.backlog", HANDSHAKE_POOL_SIZE + 1).intValue();
}
///////////////// permanent conduit state
- /** the size of OS TCP/IP buffers, not set by default */
+ /**
+ * the size of OS TCP/IP buffers, not set by default
+ */
public int tcpBufferSize = DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
public int idleConnectionTimeout = DistributionConfig.DEFAULT_SOCKET_LEASE_TIME;
- /** port is the tcp/ip port that this conduit binds to. If it is zero, a port
- from membership-port-range is selected to bind to. The actual port number this
- conduit is listening on will be in the "id" instance variable */
+ /**
+ * port is the tcp/ip port that this conduit binds to. If it is zero, a port
+ * from membership-port-range is selected to bind to. The actual port number this
+ * conduit is listening on will be in the "id" instance variable
+ */
private int port;
private int[] tcpPortRange = new int[] { 1024, 65535 };
- /** The java groups address that this conduit is associated with */
+ /**
+ * The java groups address that this conduit is associated with
+ */
private InternalDistributedMember localAddr;
-
- /** address is the InetAddress that this conduit uses for identity */
+
+ /**
+ * address is the InetAddress that this conduit uses for identity
+ */
private final InetAddress address;
-
- /** isBindAddress is true if we should bind to the address */
+
+ /**
+ * isBindAddress is true if we should bind to the address
+ */
private final boolean isBindAddress;
- /** the object that receives DistributionMessage messages
- received by this conduit. */
+ /**
+ * the object that receives DistributionMessage messages
+ * received by this conduit.
+ */
private final DirectChannel directChannel;
/**
* Stats from the delegate
@@ -154,42 +194,49 @@ public class TCPConduit implements Runnable {
* @since GemFire 4.2.1
*/
DistributionConfig config;
-
+
////////////////// runtime state that is re-initialized on a restart
- /** server socket address */
+ /**
+ * server socket address
+ */
private InetSocketAddress id;
protected volatile boolean stopped;
- /** the listener thread */
+ /**
+ * the listener thread
+ */
private Thread thread;
- /** if using NIO, this is the object used for accepting connections */
+ /**
+ * if using NIO, this is the object used for accepting connections
+ */
private ServerSocketChannel channel;
- /** the server socket */
+ /**
+ * the server socket
+ */
private ServerSocket socket;
- /** a table of Connections from this conduit to others
+ /**
+ * a table of Connections from this conduit to others
*/
private ConnectionTable conTable;
- /** <p>creates a new TCPConduit bound to the given InetAddress and port.
- The given ServerDelegate will receive any DistributionMessages
- passed to the conduit.</p>
- <p>This constructor forces the conduit to ignore the following
- system properties and look for them only in the <i>props</i> argument:</p>
- <pre>
- p2p.tcpBufferSize
- p2p.idleConnectionTimeout
- </pre>
- */
- public TCPConduit(MembershipManager mgr, int port,
- InetAddress address, boolean isBindAddress,
- DirectChannel receiver, Properties props)
- throws ConnectionException
- {
+ /**
+ * <p>creates a new TCPConduit bound to the given InetAddress and port.
+ * The given ServerDelegate will receive any DistributionMessages
+ * passed to the conduit.</p>
+ * <p>This constructor forces the conduit to ignore the following
+ * system properties and look for them only in the <i>props</i> argument:</p>
+ * <pre>
+ * p2p.tcpBufferSize
+ * p2p.idleConnectionTimeout
+ * </pre>
+ */
+ public TCPConduit(MembershipManager mgr, int port, InetAddress address, boolean isBindAddress, DirectChannel receiver, Properties props)
+ throws ConnectionException {
parseProperties(props);
this.address = address;
@@ -209,8 +256,7 @@ public class TCPConduit implements Runnable {
try {
this.conTable = ConnectionTable.create(this);
- }
- catch (IOException io) {
+ } catch (IOException io) {
throw new ConnectionException(LocalizedStrings.TCPConduit_UNABLE_TO_INITIALIZE_CONNECTION_TABLE.toLocalizedString(), io);
}
this.useNIO = USE_NIO;
@@ -219,8 +265,7 @@ public class TCPConduit implements Runnable {
if (addr == null) {
try {
addr = SocketCreator.getLocalHost();
- }
- catch (java.net.UnknownHostException e) {
+ } catch (java.net.UnknownHostException e) {
throw new ConnectionException("Unable to resolve localHost address", e);
}
}
@@ -234,96 +279,109 @@ public class TCPConduit implements Runnable {
}
}
}
-
+
startAcceptor();
}
- /** parse instance-level properties from the given object */
+ /**
+ * parse instance-level properties from the given object
+ */
private void parseProperties(Properties p) {
if (p != null) {
String s;
- s = p.getProperty("p2p.tcpBufferSize", ""+tcpBufferSize);
- try { tcpBufferSize = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE), e); }
+ s = p.getProperty("p2p.tcpBufferSize", "" + tcpBufferSize);
+ try {
+ tcpBufferSize = Integer.parseInt(s);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE), e);
+ }
if (tcpBufferSize < Connection.SMALL_BUFFER_SIZE) {
// enforce minimum
tcpBufferSize = Connection.SMALL_BUFFER_SIZE;
}
- s = p.getProperty("p2p.idleConnectionTimeout", ""+idleConnectionTimeout);
- try { idleConnectionTimeout = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT), e); }
-
+ s = p.getProperty("p2p.idleConnectionTimeout", "" + idleConnectionTimeout);
+ try {
+ idleConnectionTimeout = Integer.parseInt(s);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT), e);
+ }
+
s = p.getProperty("membership_port_range_start");
- try { tcpPortRange[0] = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGESTART), e); }
-
+ try {
+ tcpPortRange[0] = Integer.parseInt(s);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGESTART), e);
+ }
+
s = p.getProperty("membership_port_range_end");
- try { tcpPortRange[1] = Integer.parseInt(s); } catch (Exception e) { logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGEEND), e); }
-
+ try {
+ tcpPortRange[1] = Integer.parseInt(s);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_EXCEPTION_PARSING_TCPPORTRANGEEND), e);
+ }
+
}
}
private ThreadPoolExecutor hsPool;
- /** the reason for a shutdown, if abnormal */
+ /**
+ * the reason for a shutdown, if abnormal
+ */
private volatile Exception shutdownCause;
private final static int HANDSHAKE_POOL_SIZE = Integer.getInteger("p2p.HANDSHAKE_POOL_SIZE", 10).intValue();
private final static long HANDSHAKE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.HANDSHAKE_POOL_KEEP_ALIVE_TIME", 60).longValue();
- /** added to fix bug 40436 */
+ /**
+ * added to fix bug 40436
+ */
public void setMaximumHandshakePoolSize(int maxSize) {
if (this.hsPool != null && maxSize > HANDSHAKE_POOL_SIZE) {
this.hsPool.setMaximumPoolSize(maxSize);
}
}
- /** binds the server socket and gets threads going
- *
- * */
+ /**
+ * binds the server socket and gets threads going
+ */
private void startAcceptor() throws ConnectionException {
int localPort;
int p = this.port;
InetAddress ba = this.address;
-
+
{
ThreadPoolExecutor tmp_hsPool = null;
String gName = "P2P-Handshaker " + ba + ":" + p;
- final ThreadGroup socketThreadGroup
- = LoggingThreadGroup.createThreadGroup(gName, logger);
-
+ final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
ThreadFactory socketThreadFactory = new ThreadFactory() {
- int connNum = -1;
+ int connNum = -1;
- public Thread newThread(Runnable command) {
- int tnum;
- synchronized (this) {
- tnum = ++connNum;
- }
- String tName = socketThreadGroup.getName() + " Thread " + tnum;
- return new Thread(socketThreadGroup, command, tName);
+ public Thread newThread(Runnable command) {
+ int tnum;
+ synchronized (this) {
+ tnum = ++connNum;
}
- };
+ String tName = socketThreadGroup.getName() + " Thread " + tnum;
+ return new Thread(socketThreadGroup, command, tName);
+ }
+ };
try {
final BlockingQueue bq = new SynchronousQueue();
final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
- try {
- bq.put(r);
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt(); // preserve the state
- throw new RejectedExecutionException(LocalizedStrings.TCPConduit_INTERRUPTED.toLocalizedString(), ex);
- }
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+ try {
+ bq.put(r);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt(); // preserve the state
+ throw new RejectedExecutionException(LocalizedStrings.TCPConduit_INTERRUPTED.toLocalizedString(), ex);
}
- };
- tmp_hsPool = new ThreadPoolExecutor(1,
- HANDSHAKE_POOL_SIZE,
- HANDSHAKE_POOL_KEEP_ALIVE_TIME,
- TimeUnit.SECONDS,
- bq,
- socketThreadFactory,
- reh);
- }
- catch (IllegalArgumentException poolInitException) {
+ }
+ };
+ tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, HANDSHAKE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, bq, socketThreadFactory, reh);
+ } catch (IllegalArgumentException poolInitException) {
throw new ConnectionException(LocalizedStrings.TCPConduit_WHILE_CREATING_HANDSHAKE_POOL.toLocalizedString(), poolInitException);
}
this.hsPool = tmp_hsPool;
@@ -334,44 +392,43 @@ public class TCPConduit implements Runnable {
id = new InetSocketAddress(socket.getInetAddress(), localPort);
stopped = false;
- ThreadGroup group =
- LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
+ ThreadGroup group = LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger);
thread = new Thread(group, this, "P2P Listener Thread " + id);
thread.setDaemon(true);
- try { thread.setPriority(thread.getThreadGroup().getMaxPriority()); }
- catch (Exception e) {
+ try {
+ thread.setPriority(thread.getThreadGroup().getMaxPriority());
+ } catch (Exception e) {
logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_SET_LISTENER_PRIORITY__0, e.getMessage()));
}
if (!Boolean.getBoolean("p2p.test.inhibitAcceptor")) {
thread.start();
- }
- else {
+ } else {
logger.fatal(LocalizedMessage.create(LocalizedStrings.TCPConduit_INHIBITACCEPTOR));
socket.close();
this.hsPool.shutdownNow();
}
- }
- catch (IOException io) {
+ } catch (IOException io) {
String s = "While creating ServerSocket on port " + p;
throw new ConnectionException(s, io);
}
this.port = localPort;
}
-
- /** creates the server sockets. This can be used to recreate the
- * socket using this.port and this.bindAddress, which must be set
- * before invoking this method.
+
+ /**
+ * creates the server sockets. This can be used to recreate the
+ * socket using this.port and this.bindAddress, which must be set
+ * before invoking this method.
*/
private void createServerSocket() {
int p = this.port;
int b = BACKLOG;
InetAddress ba = this.address;
-
+
try {
if (this.useNIO) {
if (p <= 0) {
- socket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(ba, b, isBindAddress,
- this.useNIO, 0, tcpPortRange);
+
+ socket = SocketCreatorFactory.getClusterSSLSocketCreator().createServerSocketUsingPortRange(ba, b, isBindAddress, this.useNIO, 0, tcpPortRange);
} else {
ServerSocketChannel channl = ServerSocketChannel.open();
socket = channl.socket();
@@ -387,92 +444,91 @@ public class TCPConduit implements Runnable {
socket.setReceiveBufferSize(tcpBufferSize);
int newSize = socket.getReceiveBufferSize();
if (newSize != tcpBufferSize) {
- logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
- new Object[] {"Listener receiverBufferSize", Integer.valueOf(newSize), Integer.valueOf(tcpBufferSize)}));
+ logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2, new Object[] {
+ "Listener receiverBufferSize",
+ Integer.valueOf(newSize),
+ Integer.valueOf(tcpBufferSize)
+ }));
}
} catch (SocketException ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_SET_LISTENER_RECEIVERBUFFERSIZE_TO__0, tcpBufferSize));
}
}
channel = socket.getChannel();
- }
- else {
+ } else {
try {
if (p <= 0) {
- socket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(ba, b, isBindAddress,
- this.useNIO, this.tcpBufferSize, tcpPortRange);
+ socket = SocketCreatorFactory.getClusterSSLSocketCreator()
+ .createServerSocketUsingPortRange(ba, b, isBindAddress, this.useNIO, this.tcpBufferSize, tcpPortRange);
} else {
- socket = SocketCreator.getDefaultInstance().createServerSocket(p, b, isBindAddress? ba : null, this.tcpBufferSize);
+ socket = SocketCreatorFactory.getClusterSSLSocketCreator().createServerSocket(p, b, isBindAddress ? ba : null, this.tcpBufferSize);
}
int newSize = socket.getReceiveBufferSize();
if (newSize != this.tcpBufferSize) {
- logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
- new Object[] {"Listener receiverBufferSize", Integer.valueOf(newSize), Integer.valueOf(this.tcpBufferSize)}));
+ logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_0_IS_1_INSTEAD_OF_THE_REQUESTED_2, new Object[] {
+ "Listener receiverBufferSize",
+ Integer.valueOf(newSize),
+ Integer.valueOf(this.tcpBufferSize)
+ }));
}
} catch (SocketException ex) {
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_SET_LISTENER_RECEIVERBUFFERSIZE_TO__0, this.tcpBufferSize));
-
+
}
}
port = socket.getLocalPort();
- }
- catch (IOException io) {
- throw new ConnectionException( LocalizedStrings.TCPConduit_EXCEPTION_CREATING_SERVERSOCKET.toLocalizedString(
- new Object[] {Integer.valueOf(p), ba}), io);
+ } catch (IOException io) {
+ throw new ConnectionException(LocalizedStrings.TCPConduit_EXCEPTION_CREATING_SERVERSOCKET.toLocalizedString(new Object[] { Integer.valueOf(p), ba }), io);
}
}
/**
* Ensure that the ConnectionTable class gets loaded.
- *
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
ConnectionTable.loadEmergencyClasses();
}
-
+
/**
* Close the ServerSocketChannel, ServerSocket, and the
* ConnectionTable.
- *
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
-// stop(); // Causes grief
+ // stop(); // Causes grief
if (stopped) {
return;
}
-
+
stopped = true;
-// System.err.println("DEBUG: TCPConduit emergencyClose");
+ // System.err.println("DEBUG: TCPConduit emergencyClose");
try {
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
- }
- else {
+ } else {
if (socket != null) {
socket.close();
}
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
// ignore, please!
}
// this.hsPool.shutdownNow(); // I don't trust this not to allocate objects or to synchronize
-// this.conTable.close(); not safe against deadlocks
+ // this.conTable.close(); not safe against deadlocks
ConnectionTable.emergencyClose();
-
+
socket = null;
thread = null;
conTable = null;
-
-// System.err.println("DEBUG: end of TCPConduit emergencyClose");
+
+ // System.err.println("DEBUG: end of TCPConduit emergencyClose");
}
-
+
/* stops the conduit, closing all tcp/ip connections */
public void stop(Exception cause) {
if (!stopped) {
@@ -486,13 +542,13 @@ public class TCPConduit implements Runnable {
// set timeout endpoint here since interrupt() has been known
// to hang
long timeout = System.currentTimeMillis() + LISTENER_CLOSE_TIMEOUT;
- Thread t = this.thread;;
+ Thread t = this.thread;
+ ;
if (channel != null) {
channel.close();
// NOTE: do not try to interrupt the listener thread at this point.
// Doing so interferes with the channel's socket logic.
- }
- else {
+ } else {
ServerSocket s = this.socket;
if (s != null) {
s.close();
@@ -501,7 +557,7 @@ public class TCPConduit implements Runnable {
t.interrupt();
}
}
-
+
do {
t = this.thread;
if (t == null || !t.isAlive()) {
@@ -511,15 +567,13 @@ public class TCPConduit implements Runnable {
} while (timeout > System.currentTimeMillis());
if (t != null && t.isAlive()) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_UNABLE_TO_SHUT_DOWN_LISTENER_WITHIN_0_MS_UNABLE_TO_INTERRUPT_SOCKET_ACCEPT_DUE_TO_JDK_BUG_GIVING_UP,
- Integer.valueOf(LISTENER_CLOSE_TIMEOUT)));
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_SHUT_DOWN_LISTENER_WITHIN_0_MS_UNABLE_TO_INTERRUPT_SOCKET_ACCEPT_DUE_TO_JDK_BUG_GIVING_UP, Integer
+ .valueOf(LISTENER_CLOSE_TIMEOUT)));
}
} catch (IOException e) {
} catch (InterruptedException e) {
// Ignore, we're trying to stop already.
- }
- finally {
+ } finally {
this.hsPool.shutdownNow();
}
@@ -534,18 +588,20 @@ public class TCPConduit implements Runnable {
/**
* Returns whether or not this conduit is stopped
- *
* @since GemFire 3.0
*/
public boolean isStopped() {
return this.stopped;
}
- /** starts the conduit again after it's been stopped. This will clear the
- server map if the conduit's port is zero (wildcard bind) */
+ /**
+ * starts the conduit again after it's been stopped. This will clear the
+ * server map if the conduit's port is zero (wildcard bind)
+ */
public void restart() throws ConnectionException {
- if (!stopped)
+ if (!stopped) {
return;
+ }
this.stats = null;
if (directChannel != null) {
this.stats = directChannel.getDMStats();
@@ -555,20 +611,21 @@ public class TCPConduit implements Runnable {
}
try {
this.conTable = ConnectionTable.create(this);
- }
- catch (IOException io) {
+ } catch (IOException io) {
throw new ConnectionException(LocalizedStrings.TCPConduit_UNABLE_TO_INITIALIZE_CONNECTION_TABLE.toLocalizedString(), io);
}
startAcceptor();
}
- /** this is the server socket listener thread's run loop */
+ /**
+ * this is the server socket listener thread's run loop
+ */
public void run() {
ConnectionTable.threadWantsSharedResources();
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "Starting P2P Listener on {}", id);
}
- for(;;) {
+ for (; ; ) {
SystemFailure.checkFailure();
if (stopper.cancelInProgress() != null) {
break;
@@ -586,49 +643,42 @@ public class TCPConduit implements Runnable {
Socket othersock = null;
try {
if (this.useNIO) {
- SocketChannel otherChannel = channel.accept();
+ SocketChannel otherChannel = channel.accept();
othersock = otherChannel.socket();
- }
- else {
+ } else {
try {
othersock = socket.accept();
- }
- catch (SSLException ex) {
+ } catch (SSLException ex) {
// SW: This is the case when there is a problem in P2P
// SSL configuration, so need to exit otherwise goes into an
// infinite loop just filling the logs
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_STOPPING_P2P_LISTENER_DUE_TO_SSL_CONFIGURATION_PROBLEM), ex);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_STOPPING_P2P_LISTENER_DUE_TO_SSL_CONFIGURATION_PROBLEM), ex);
break;
}
- SocketCreator.getDefaultInstance().configureServerSSLSocket(
- othersock);
+ SocketCreatorFactory.getClusterSSLSocketCreator().configureServerSSLSocket(othersock);
}
if (stopped) {
try {
if (othersock != null) {
othersock.close();
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
}
continue;
}
if (inhibitNewConnections) {
-// if (logger.isTraceEnabled(LogMarker.QA)) {
- logger.info("Test hook: inhibiting acceptance of connection {}", othersock);
-// }
+ // if (logger.isTraceEnabled(LogMarker.QA)) {
+ logger.info("Test hook: inhibiting acceptance of connection {}", othersock);
+ // }
othersock.close();
while (inhibitNewConnections && !stopped) {
this.stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
- try {
- Thread.sleep(2000);
- }
- catch (InterruptedException e) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
interrupted = true;
- }
- finally {
+ } finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
@@ -637,41 +687,32 @@ public class TCPConduit implements Runnable {
if (logger.isTraceEnabled(LogMarker.QA)) {
logger.trace(LogMarker.QA, "Test hook: finished inhibiting acceptance of connections");
}
- }
- else {
+ } else {
acceptConnection(othersock);
}
- }
- catch (ClosedByInterruptException cbie) {
+ } catch (ClosedByInterruptException cbie) {
//safe to ignore
- }
- catch (ClosedChannelException e) {
+ } catch (ClosedChannelException e) {
break; // we're dead
- }
- catch (CancelException e) {
+ } catch (CancelException e) {
break;
- }
- catch (Exception e) {
+ } catch (Exception e) {
if (!stopped) {
if (e instanceof SocketException && "Socket closed".equalsIgnoreCase(e.getMessage())) {
// safe to ignore; see bug 31156
if (!socket.isClosed()) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_SERVERSOCKET_THREW_SOCKET_CLOSED_EXCEPTION_BUT_SAYS_IT_IS_NOT_CLOSED), e);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_SERVERSOCKET_THREW_SOCKET_CLOSED_EXCEPTION_BUT_SAYS_IT_IS_NOT_CLOSED), e);
try {
socket.close();
createServerSocket();
- }
- catch (IOException ioe) {
+ } catch (IOException ioe) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.TCPConduit_UNABLE_TO_CLOSE_AND_RECREATE_SERVER_SOCKET), ioe);
// post 5.1.0x, this should force shutdown
try {
Thread.sleep(5000);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
// Don't reset; we're just exiting the thread
- logger.info(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_INTERRUPTED_AND_EXITING_WHILE_TRYING_TO_RECREATE_LISTENER_SOCKETS));
+ logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_INTERRUPTED_AND_EXITING_WHILE_TRYING_TO_RECREATE_LISTENER_SOCKETS));
return;
}
}
@@ -693,8 +734,7 @@ public class TCPConduit implements Runnable {
logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_SERVERSOCKET_CLOSED_REOPENING));
try {
createServerSocket();
- }
- catch (ConnectionException ex) {
+ } catch (ConnectionException ex) {
logger.warn(ex.getMessage(), ex);
}
}
@@ -708,19 +748,18 @@ public class TCPConduit implements Runnable {
private void acceptConnection(final Socket othersock) {
try {
this.hsPool.execute(new Runnable() {
- public void run() {
- basicAcceptConnection(othersock);
- }
- });
- }
- catch (RejectedExecutionException rejected) {
+ public void run() {
+ basicAcceptConnection(othersock);
+ }
+ });
+ } catch (RejectedExecutionException rejected) {
try {
othersock.close();
- }
- catch (IOException ignore) {
+ } catch (IOException ignore) {
}
}
}
+
private ConnectionTable getConTable() {
ConnectionTable result = this.conTable;
if (result == null) {
@@ -729,41 +768,40 @@ public class TCPConduit implements Runnable {
}
return result;
}
+
protected void basicAcceptConnection(Socket othersock) {
try {
getConTable().acceptConnection(othersock);
- }
- catch (IOException io) {
+ } catch (IOException io) {
// exception is logged by the Connection
if (!stopped) {
this.stats.incFailedAccept();
}
- }
- catch (ConnectionException ex) {
+ } catch (ConnectionException ex) {
// exception is logged by the Connection
if (!stopped) {
this.stats.incFailedAccept();
}
- }
- catch (CancelException e) {
- }
- catch (Exception e) {
+ } catch (CancelException e) {
+ } catch (Exception e) {
if (!stopped) {
-// if (e instanceof SocketException
-// && "Socket closed".equals(e.getMessage())) {
-// // safe to ignore; see bug 31156
-// }
-// else
+ // if (e instanceof SocketException
+ // && "Socket closed".equals(e.getMessage())) {
+ // // safe to ignore; see bug 31156
+ // }
+ // else
{
this.stats.incFailedAccept();
- logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1,
- new Object[] {othersock.getInetAddress(), e}), e);
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1, new Object[] {
+ othersock.getInetAddress(),
+ e
+ }), e);
}
}
//connections.cleanupLowWater();
}
}
-
+
/**
* return true if "new IO" classes are being used for the server socket
*/
@@ -776,22 +814,17 @@ public class TCPConduit implements Runnable {
* ordered connections
* @since GemFire 5.1
*/
- public void getThreadOwnedOrderedConnectionState(
- DistributedMember member,
- Map result)
- {
+ public void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) {
getConTable().getThreadOwnedOrderedConnectionState(member, result);
}
-
+
/**
* wait for the incoming connections identified by the keys in the
* argument to receive and dispatch the number of messages associated
* with the key
* @since GemFire 5.1
*/
- public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState)
- throws InterruptedException
- {
+ public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState) throws InterruptedException {
// if (Thread.interrupted()) throw new InterruptedException(); not necessary done in waitForThreadOwnedOrderedConnectionState
getConTable().waitForThreadOwnedOrderedConnectionState(member, channelState);
}
@@ -800,9 +833,8 @@ public class TCPConduit implements Runnable {
* connections send messageReceived when a message object has been
* read.
* @param bytesRead number of bytes read off of network to get this message
- */
- protected void messageReceived(Connection receiver, DistributionMessage message,
- int bytesRead) {
+ */
+ protected void messageReceived(Connection receiver, DistributionMessage message, int bytesRead) {
if (logger.isTraceEnabled()) {
logger.trace("{} received {} from {}", id, message, receiver);
}
@@ -816,12 +848,16 @@ public class TCPConduit implements Runnable {
}
}
- /** gets the address of this conduit's ServerSocket endpoint */
+ /**
+ * gets the address of this conduit's ServerSocket endpoint
+ */
public InetSocketAddress getId() {
return id;
}
- /** gets the actual port to which this conduit's ServerSocket is bound */
+ /**
+ * gets the actual port to which this conduit's ServerSocket is bound
+ */
public int getPort() {
return id.getPort();
}
@@ -832,14 +868,19 @@ public class TCPConduit implements Runnable {
public InternalDistributedMember getLocalAddress() {
return this.localAddr;
}
- /** gets the requested port that this TCPConduit bound to. This could
- be zero if a wildcard bind was done */
+
+ /**
+ * gets the requested port that this TCPConduit bound to. This could
+ * be zero if a wildcard bind was done
+ */
public int getBindPort() {
return port;
}
-
- /** gets the channel that is used to process non-DistributedMember messages */
+
+ /**
+ * gets the channel that is used to process non-DistributedMember messages
+ */
public DirectChannel getDirectChannel() {
return directChannel;
}
@@ -847,7 +888,7 @@ public class TCPConduit implements Runnable {
public void setLocalAddr(InternalDistributedMember addr) {
localAddr = addr;
}
-
+
public InternalDistributedMember getLocalAddr() {
return localAddr;
}
@@ -856,19 +897,21 @@ public class TCPConduit implements Runnable {
* Return a connection to the given member. This method must continue
* to attempt to create a connection to the given member as long as that
* member is in the membership view and the system is not shutting down.
- *
* @param memberAddress the IDS associated with the remoteId
* @param preserveOrder whether this is an ordered or unordered connection
* @param retry false if this is the first attempt
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted (or zero)
+ *
* @return the connection
*/
- public Connection getConnection(InternalDistributedMember memberAddress, final boolean preserveOrder, boolean retry, long startTime,
- long ackTimeout, long ackSATimeout)
- throws java.io.IOException, DistributedSystemDisconnectedException
- {
+ public Connection getConnection(InternalDistributedMember memberAddress,
+ final boolean preserveOrder,
+ boolean retry,
+ long startTime,
+ long ackTimeout,
+ long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException {
//final boolean preserveOrder = (processorType == DistributionManager.SERIAL_EXECUTOR )|| (processorType == DistributionManager.PARTITIONED_REGION_EXECUTOR);
if (stopped) {
throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_THE_CONDUIT_IS_STOPPED.toLocalizedString());
@@ -877,181 +920,166 @@ public class TCPConduit implements Runnable {
Connection conn = null;
InternalDistributedMember memberInTrouble = null;
boolean breakLoop = false;
- for (;;) {
+ for (; ; ) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
- // If this is the second time through this loop, we had
- // problems. Tear down the connection so that it gets
- // rebuilt.
- if (retry || conn != null) { // not first time in loop
- if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) {
- throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
- }
- // bug35953: Member is still in view; we MUST NOT give up!
-
- // Pause just a tiny bit...
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- interrupted = true;
- stopper.checkCancelInProgress(e);
- }
-
- // try again after sleep
- if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) {
- // OK, the member left. Just register an error.
- throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
- }
-
- // Print a warning (once)
- if (memberInTrouble == null) {
- memberInTrouble = memberAddress;
- logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
- }
- else {
- if (logger.isDebugEnabled()) {
- logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
+ // If this is the second time through this loop, we had
+ // problems. Tear down the connection so that it gets
+ // rebuilt.
+ if (retry || conn != null) { // not first time in loop
+ if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) {
+ throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
}
- }
-
- // Close the connection (it will get rebuilt later).
- this.stats.incReconnectAttempts();
- if (conn != null) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}",
- conn, memberInTrouble);
- }
- conn.closeForReconnect("closing before retrying");
- }
- catch (CancelException ex) {
- throw ex;
+ // bug35953: Member is still in view; we MUST NOT give up!
+
+ // Pause just a tiny bit...
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ stopper.checkCancelInProgress(e);
}
- catch (Exception ex) {
+
+ // try again after sleep
+ if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) {
+ // OK, the member left. Just register an error.
+ throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString());
}
- }
- } // not first time in loop
-
- Exception problem = null;
- try {
- // Get (or regenerate) the connection
- // bug36202: this could generate a ConnectionException, so it
- // must be caught and retried
- boolean retryForOldConnection;
- boolean debugRetry = false;
- do {
- retryForOldConnection = false;
- conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout);
- if (conn == null) {
- // conduit may be closed - otherwise an ioexception would be thrown
- problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress));
- } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
+
+ // Print a warning (once)
+ if (memberInTrouble == null) {
+ memberInTrouble = memberAddress;
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble));
+ } else {
if (logger.isDebugEnabled()) {
- logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
+ logger.debug("Attempting TCP/IP reconnect to {}", memberInTrouble);
}
- conn.closeOldConnection("closing old connection");
- conn = null;
- retryForOldConnection = true;
- debugRetry = true;
}
- } while (retryForOldConnection);
- if (debugRetry && logger.isDebugEnabled()) {
- logger.debug("Done removing old connections");
- }
- // we have a connection; fall through and return it
- }
- catch (ConnectionException e) {
- // Race condition between acquiring the connection and attempting
- // to use it: another thread closed it.
- problem = e;
- // [sumedh] No need to retry since Connection.createSender has already
- // done retries and now member is really unreachable for some reason
- // even though it may be in the view
- breakLoop = true;
- }
- catch (IOException e) {
- problem = e;
- // bug #43962 don't keep trying to connect to an alert listener
- if (AlertAppender.isThreadAlerting()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Giving up connecting to alert listener {}", memberAddress);
+ // Close the connection (it will get rebuilt later).
+ this.stats.incReconnectAttempts();
+ if (conn != null) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}", conn, memberInTrouble);
+ }
+ conn.closeForReconnect("closing before retrying");
+ } catch (CancelException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ }
}
- breakLoop = true;
- }
- }
+ } // not first time in loop
- if (problem != null) {
- // Some problems are not recoverable; check and error out early.
- if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view
- // Bracket our original warning
- if (memberInTrouble != null) {
- // make this msg info to bracket warning
- logger.info(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
- }
- throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress));
- } // left the view
-
- if (membershipManager.shutdownInProgress()) { // shutdown in progress
- // Bracket our original warning
- if (memberInTrouble != null) {
- // make this msg info to bracket warning
- logger.info(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED, memberInTrouble));
+ Exception problem = null;
+ try {
+ // Get (or regenerate) the connection
+ // bug36202: this could generate a ConnectionException, so it
+ // must be caught and retried
+ boolean retryForOldConnection;
+ boolean debugRetry = false;
+ do {
+ retryForOldConnection = false;
+ conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout);
+ if (conn == null) {
+ // conduit may be closed - otherwise an ioexception would be thrown
+ problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress));
+ } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode());
+ }
+ conn.closeOldConnection("closing old connection");
+ conn = null;
+ retryForOldConnection = true;
+ debugRetry = true;
+ }
+ } while (retryForOldConnection);
+ if (debugRetry && logger.isDebugEnabled()) {
+ logger.debug("Done removing old connections");
}
- stopper.checkCancelInProgress(null);
- throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_ABANDONED_BECAUSE_SHUTDOWN_IS_IN_PROGRESS.toLocalizedString());
- } // shutdown in progress
-
- // Log the warning. We wait until now, because we want
- // to have m defined for a nice message...
- if (memberInTrouble == null) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1,
- new Object[] {memberAddress, problem}));
- memberInTrouble = memberAddress;
- }
- else {
- if (logger.isDebugEnabled()) {
- logger.debug("Error sending message to {}", memberAddress, problem);
+
+ // we have a connection; fall through and return it
+ } catch (ConnectionException e) {
+ // Race condition between acquiring the connection and attempting
+ // to use it: another thread closed it.
+ problem = e;
+ // [sumedh] No need to retry since Connection.createSender has already
+ // done retries and now member is really unreachable for some reason
+ // even though it may be in the view
+ breakLoop = true;
+ } catch (IOException e) {
+ problem = e;
+ // bug #43962 don't keep trying to connect to an alert listener
+ if (AlertAppender.isThreadAlerting()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Giving up connecting to alert listener {}", memberAddress);
+ }
+ breakLoop = true;
}
}
- if (breakLoop) {
- if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_THROWING_IOEXCEPTION_AFTER_FINDING_BREAKLOOP_TRUE),
- problem);
+ if (problem != null) {
+ // Some problems are not recoverable; check and error out early.
+ if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view
+ // Bracket our original warning
+ if (memberInTrouble != null) {
+ // make this msg info to bracket warning
+ logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble));
+ }
+ throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress));
+ } // left the view
+
+ if (membershipManager.shutdownInProgress()) { // shutdown in progress
+ // Bracket our original warning
+ if (memberInTrouble != null) {
+ // make this msg info to bracket warning
+ logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED, memberInTrouble));
+ }
+ stopper.checkCancelInProgress(null);
+ throw new DistributedSystemDisconnectedException(LocalizedStrings.TCPConduit_ABANDONED_BECAUSE_SHUTDOWN_IS_IN_PROGRESS.toLocalizedString());
+ } // shutdown in progress
+
+ // Log the warning. We wait until now, because we want
+ // to have m defined for a nice message...
+ if (memberInTrouble == null) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1, new Object[] {
+ memberAddress,
+ problem
+ }));
+ memberInTrouble = memberAddress;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Error sending message to {}", memberAddress, problem);
+ }
}
- if (problem instanceof IOException) {
- throw (IOException)problem;
+
+ if (breakLoop) {
+ if (!problem.getMessage().startsWith("Cannot form connection to alert listener")) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_THROWING_IOEXCEPTION_AFTER_FINDING_BREAKLOOP_TRUE), problem);
+ }
+ if (problem instanceof IOException) {
+ throw (IOException) problem;
+ } else {
+ IOException ioe = new IOException(LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress));
+ ioe.initCause(problem);
+ throw ioe;
+ }
}
- else {
- IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress));
- ioe.initCause(problem);
- throw ioe;
+ // Retry the operation (indefinitely)
+ continue;
+ } // problem != null
+ // Success!
+
+ // Make sure our logging is bracketed if there was a problem
+ if (memberInTrouble != null) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0, memberInTrouble));
+ if (logger.isTraceEnabled()) {
+ logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
}
}
- // Retry the operation (indefinitely)
- continue;
- } // problem != null
- // Success!
-
- // Make sure our logging is bracketed if there was a problem
- if (memberInTrouble != null) {
- logger.info(LocalizedMessage.create(
- LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0,
- memberInTrouble));
- if (logger.isTraceEnabled()) {
- logger.trace("new connection is {} memberAddress={}", conn, memberAddress);
- }
- }
- return conn;
- }
- finally {
+ return conn;
+ } finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
@@ -1077,19 +1105,21 @@ public class TCPConduit implements Runnable {
}
}
}
+
/**
* Returns the distribution manager of the direct channel
*/
public DM getDM() {
return directChannel.getDM();
}
+
/**
* Closes any connections used to communicate with the given member
*/
public void removeEndpoint(DistributedMember mbr, String reason) {
removeEndpoint(mbr, reason, true);
}
-
+
public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) {
ConnectionTable ct = this.conTable;
if (ct == null) {
@@ -1097,13 +1127,15 @@ public class TCPConduit implements Runnable {
}
ct.removeEndpoint(mbr, reason, notifyDisconnect);
}
-
- /** check to see if there are still any receiver threads for the given end-point */
+
+ /**
+ * check to see if there are still any receiver threads for the given end-point
+ */
public boolean hasReceiversFor(DistributedMember endPoint) {
ConnectionTable ct = this.conTable;
return (ct != null) && ct.hasReceiversFor(endPoint);
}
-
+
protected class Stopper extends CancelCriterion {
/* (non-Javadoc)
@@ -1120,7 +1152,7 @@ public class TCPConduit implements Runnable {
}
return null;
}
-
+
/* (non-Javadoc)
* @see com.gemstone.gemfire.CancelCriterion#generateCancelledException(java.lang.Throwable)
*/
@@ -1144,14 +1176,14 @@ public class TCPConduit implements Runnable {
return result;
}
}
-
+
private final Stopper stopper = new Stopper();
-
+
public CancelCriterion getCancelCriterion() {
return stopper;
}
-
-
+
+
/**
* if the conduit is disconnected due to an abnormal condition, this
* will describe the reason
@@ -1160,7 +1192,7 @@ public class TCPConduit implements Runnable {
public Exception getShutdownCause() {
return this.shutdownCause;
}
-
+
/**
* ARB: Called by Connection before handshake reply is sent.
* Returns true if member is part of view, false if membership is not confirmed before timeout.
@@ -1168,20 +1200,20 @@ public class TCPConduit implements Runnable {
public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
return membershipManager.waitForMembershipCheck(remoteId);
}
-
+
/**
* simulate being sick
*/
public void beSick() {
-// this.inhibitNewConnections = true;
-// this.conTable.closeReceivers(true);
+ // this.inhibitNewConnections = true;
+ // this.conTable.closeReceivers(true);
}
-
+
/**
* simulate being healthy
*/
public void beHealthy() {
-// this.inhibitNewConnections = false;
+ // this.inhibitNewConnections = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
index 3af34e1..19679ad 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/JmxManagerAdvisee.java
@@ -19,7 +19,7 @@ package com.gemstone.gemfire.management.internal;
import com.gemstone.gemfire.CancelCriterion;
import com.gemstone.gemfire.distributed.internal.*;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.management.ManagementService;
import com.gemstone.gemfire.management.internal.JmxManagerAdvisor.JmxManagerProfile;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
index 9807456..bb9bfb4 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/ManagementAgent.java
@@ -16,15 +16,49 @@
*/
package com.gemstone.gemfire.management.internal;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.rmi.AlreadyBoundException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.RMIClientSocketFactory;
+import java.rmi.server.RMIServerSocketFactory;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Set;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+import javax.management.remote.rmi.RMIServerImpl;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+
+import org.apache.logging.log4j.Logger;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+
import com.gemstone.gemfire.GemFireConfigException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.GemFireVersion;
-import com.gemstone.gemfire.internal.SocketCreator;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.lang.StringUtils;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.net.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
import com.gemstone.gemfire.internal.security.shiro.JMXShiroAuthenticator;
import com.gemstone.gemfire.internal.tcp.TCPConduit;
import com.gemstone.gemfire.management.ManagementException;
@@ -34,44 +68,18 @@ import com.gemstone.gemfire.management.internal.security.AccessControlMBean;
import com.gemstone.gemfire.management.internal.security.MBeanServerWrapper;
import com.gemstone.gemfire.management.internal.security.ResourceConstants;
import com.gemstone.gemfire.management.internal.unsafe.ReadOpFileAccessController;
-import org.apache.logging.log4j.Logger;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-
-import javax.management.*;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXServiceURL;
-import javax.management.remote.rmi.RMIConnectorServer;
-import javax.management.remote.rmi.RMIJRMPServerImpl;
-import javax.management.remote.rmi.RMIServerImpl;
-import javax.rmi.ssl.SslRMIClientSocketFactory;
-import java.io.IOException;
-import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.RMIClientSocketFactory;
-import java.rmi.server.RMIServerSocketFactory;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.HashMap;
-import java.util.Set;
/**
* Agent implementation that controls the JMX server end points for JMX clients
* to connect, such as an RMI server.
- *
+ * <p>
* The ManagementAgent could be used in a loner or GemFire client to define and
* control JMX server end points for the Platform MBeanServer and the GemFire
* MBeans hosted within it.
- *
* @since GemFire 7.0
*/
public class ManagementAgent {
+
private static final Logger logger = LogService.getLogger();
/**
@@ -115,9 +123,10 @@ public class ManagementAgent {
}
private boolean isServerNode(GemFireCacheImpl cache) {
- return (cache.getDistributedSystem().getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE
- && cache.getDistributedSystem().getDistributedMember().getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE && !cache
- .isClient());
+ return (cache.getDistributedSystem().getDistributedMember().getVmKind() != DistributionManager.LOCATOR_DM_TYPE && cache.getDistributedSystem()
+ .getDistributedMember()
+ .getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE && !cache
+ .isClient());
}
public synchronized void startAgent(GemFireCacheImpl cache) {
@@ -128,8 +137,7 @@ public class ManagementAgent {
startHttpService(isServerNode(cache));
} else {
if (logger.isDebugEnabled()) {
- logger
- .debug("Developer REST APIs webapp is already running, Not Starting M&M REST and pulse!");
+ logger.debug("Developer REST APIs webapp is already running, Not Starting M&M REST and pulse!");
}
}
@@ -146,8 +154,9 @@ public class ManagementAgent {
public synchronized void stopAgent() {
stopHttpService();
- if (!this.running)
+ if (!this.running) {
return;
+ }
if (logger.isDebugEnabled()) {
logger.debug("Stopping jmx manager agent");
@@ -167,15 +176,13 @@ public class ManagementAgent {
private AgentUtil agentUtil = new AgentUtil(GEMFIRE_VERSION);
private void startHttpService(boolean isServer) {
- final SystemManagementService managementService = (SystemManagementService) ManagementService
- .getManagementService(CacheFactory.getAnyInstance());
+ final SystemManagementService managementService = (SystemManagementService) ManagementService.getManagementService(CacheFactory.getAnyInstance());
final ManagerMXBean managerBean = managementService.getManagerMXBean();
if (this.config.getHttpServicePort() != 0) {
if (logger.isDebugEnabled()) {
- logger.debug("Attempting to start HTTP service on port ({}) at bind-address ({})...",
- this.config.getHttpServicePort(), this.config.getHttpServiceBindAddress());
+ logger.debug("Attempting to start HTTP service on port ({}) at bind-address ({})...", this.config.getHttpServicePort(), this.config.getHttpServiceBindAddress());
}
// Find the Management WAR file
@@ -195,8 +202,7 @@ public class ManagementAgent {
if (logger.isDebugEnabled()) {
logger.debug(message);
}
- }
- else if (isCustomAuthenticator()){
+ } else if (isCustomAuthenticator()) {
System.setProperty("spring.profiles.active", "pulse.authentication.gemfire");
}
@@ -218,15 +224,11 @@ public class ManagementAgent {
boolean isRestWebAppAdded = false;
- this.httpServer = JettyHelper.initJetty(bindAddress, port,
- this.config.getHttpServiceSSLEnabled(),
- this.config.getHttpServiceSSLRequireAuthentication(),
- this.config.getHttpServiceSSLProtocols(), this.config.getHttpServiceSSLCiphers(),
- this.config.getHttpServiceSSLProperties());
+ this.httpServer = JettyHelper.initJetty(bindAddress, port, this.config.getHttpServiceSSLEnabled(), this.config.getHttpServiceSSLRequireAuthentication(), this.config
+ .getHttpServiceSSLProtocols(), this.config.getHttpServiceSSLCiphers(), this.config.getHttpServiceSSLProperties());
if (agentUtil.isWebApplicationAvailable(gemfireWar)) {
- this.httpServer = JettyHelper
- .addWebApplication(this.httpServer, "/gemfire", gemfireWar);
+ this.httpServer = JettyHelper.addWebApplication(this.httpServer, "/gemfire", gemfireWar);
}
if (agentUtil.isWebApplicationAvailable(pulseWar)) {
@@ -235,8 +237,7 @@ public class ManagementAgent {
if (isServer && this.config.getStartDevRestApi()) {
if (agentUtil.isWebApplicationAvailable(gemfireAPIWar)) {
- this.httpServer = JettyHelper.addWebApplication(this.httpServer, "/gemfire-api",
- gemfireAPIWar);
+ this.httpServer = JettyHelper.addWebApplication(this.httpServer, "/gemfire-api", gemfireAPIWar);
isRestWebAppAdded = true;
}
} else {
@@ -248,8 +249,7 @@ public class ManagementAgent {
}
if (logger.isDebugEnabled()) {
- logger.debug("Starting HTTP embedded server on port ({}) at bind-address ({})...",
- ((ServerConnector) this.httpServer.getConnectors()[0]).getPort(), bindAddress);
+ logger.debug("Starting HTTP embedded server on port ({}) at bind-address ({})...", ((ServerConnector) this.httpServer.getConnectors()[0]).getPort(), bindAddress);
}
System.setProperty(PULSE_EMBEDDED_PROP, "true");
@@ -259,8 +259,7 @@ public class ManagementAgent {
// now, that Tomcat has been started, we can set the URL used by web
// clients to connect to Pulse
if (agentUtil.isWebApplicationAvailable(pulseWar)) {
- managerBean.setPulseURL("http://".concat(getHost(bindAddress)).concat(":")
- .concat(String.valueOf(port)).concat("/pulse/"));
+ managerBean.setPulseURL("http://".concat(getHost(bindAddress)).concat(":").concat(String.valueOf(port)).concat("/pulse/"));
}
// set cache property for developer REST service running
@@ -278,15 +277,13 @@ public class ManagementAgent {
}
} catch (Exception e) {
stopHttpService();// Jetty needs to be stopped even if it has failed to
- // start. Some of the threads are left behind even if
- // server.start() fails due to an exception
- setStatusMessage(managerBean, "HTTP service failed to start with "
- + e.getClass().getSimpleName() + " '" + e.getMessage() + "'");
+ // start. Some of the threads are left behind even if
+ // server.start() fails due to an exception
+ setStatusMessage(managerBean, "HTTP service failed to start with " + e.getClass().getSimpleName() + " '" + e.getMessage() + "'");
throw new ManagementException("HTTP service failed to start", e);
}
} else {
- setStatusMessage(managerBean,
- "Embedded HTTP server configured not to start (http-service-port=0) or (jmx-manager-http-port=0)");
+ setStatusMessage(managerBean, "Embedded HTTP server configured not to start (http-service-port=0) or (jmx-manager-http-port=0)");
}
}
@@ -318,8 +315,7 @@ public class ManagementAgent {
try {
this.httpServer.destroy();
} catch (Exception ignore) {
- logger.error("Failed to properly release resources held by the HTTP service: {}",
- ignore.getMessage(), ignore);
+ logger.error("Failed to properly release resources held by the HTTP service: {}", ignore.getMessage(), ignore);
} finally {
this.httpServer = null;
System.clearProperty("catalina.base");
@@ -357,14 +353,10 @@ public class ManagementAgent {
final boolean ssl = this.config.getJmxManagerSSLEnabled();
if (logger.isDebugEnabled()) {
- logger.debug("Starting jmx manager agent on port {}{}", port,
- (bindAddr != null ? (" bound to " + bindAddr) : "") + (ssl ? " using SSL" : ""));
+ logger.debug("Starting jmx manager agent on port {}{}", port, (bindAddr != null ? (" bound to " + bindAddr) : "") + (ssl ? " using SSL" : ""));
}
- final SocketCreator sc = SocketCreator.createNonDefaultInstance(ssl,
- this.config.getJmxManagerSSLRequireAuthentication(),
- this.config.getJmxManagerSSLProtocols(), this.config.getJmxManagerSSLCiphers(),
- this.config.getJmxSSLProperties());
+ final SocketCreator sc = SocketCreatorFactory.getJMXManagerSSLSocketCreator();
RMIClientSocketFactory csf = ssl ? new SslRMIClientSocketFactory() : null;// RMISocketFactory.getDefaultSocketFactory();
// new GemFireRMIClientSocketFactory(sc, getLogger());
RMIServerSocketFactory ssf = new GemFireRMIServerSocketFactory(sc, bindAddr);
@@ -413,8 +405,7 @@ public class ManagementAgent {
//
// We construct a JMXServiceURL corresponding to what we have done
// for our stub...
- final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://" + hostname + ":" + port
- + "/jndi/rmi://" + hostname + ":" + port + "/jmxrmi");
+ final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://" + hostname + ":" + port + "/jndi/rmi://" + hostname + ":" + port + "/jmxrmi");
// Create an RMI connector server with the JMXServiceURL
//
@@ -441,7 +432,7 @@ public class ManagementAgent {
};
String shiroConfig = this.config.getShiroInit();
- if (! StringUtils.isBlank(shiroConfig) || isCustomAuthenticator()) {
+ if (!StringUtils.isBlank(shiroConfig) || isCustomAuthenticator()) {
shiroAuthenticator = new JMXShiroAuthenticator();
env.put(JMXConnectorServer.AUTHENTICATOR, shiroAuthenticator);
cs.addNotificationListener(shiroAuthenticator, null, cs.getAttributes());
@@ -450,9 +441,7 @@ public class ManagementAgent {
MBeanServerWrapper mBeanServerWrapper = new MBeanServerWrapper();
cs.setMBeanServerForwarder(mBeanServerWrapper);
registerAccessControlMBean();
- }
-
- else {
+ } else {
/* Disable the old authenticator mechanism */
String pwFile = this.config.getJmxManagerPasswordFile();
if (pwFile != null && pwFile.length() > 0) {
@@ -511,11 +500,11 @@ public class ManagementAgent {
return factoryName != null && !factoryName.isEmpty();
}
- private static class GemFireRMIClientSocketFactory implements RMIClientSocketFactory,
- Serializable {
+ private static class GemFireRMIClientSocketFactory implements RMIClientSocketFactory, Serializable {
+
private static final long serialVersionUID = -7604285019188827617L;
- private/* final hack to prevent serialization */transient SocketCreator sc;
+ private/* final hack to prevent serialization */ transient SocketCreator sc;
public GemFireRMIClientSocketFactory(SocketCreator sc) {
this.sc = sc;
@@ -525,12 +514,14 @@ public class ManagementAgent {
public Socket createSocket(String host, int port) throws IOException {
return this.sc.connectForClient(host, port, 0/* no timeout */);
}
- };
+ }
+
+ ;
+
+ private static class GemFireRMIServerSocketFactory implements RMIServerSocketFactory, Serializable {
- private static class GemFireRMIServerSocketFactory implements RMIServerSocketFactory,
- Serializable {
private static final long serialVersionUID = -811909050641332716L;
- private/* final hack to prevent serialization */transient SocketCreator sc;
+ private/* final hack to prevent serialization */ transient SocketCreator sc;
private final InetAddress bindAddr;
public GemFireRMIServerSocketFactory(SocketCreator sc, InetAddress bindAddr) {
@@ -542,5 +533,7 @@ public class ManagementAgent {
public ServerSocket createServerSocket(int port) throws IOException {
return this.sc.createServerSocket(port, TCPConduit.getBackLog(), this.bindAddr);
}
- };
+ }
+
+ ;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
index 8b41548..53f0894 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/RestAgent.java
@@ -20,7 +20,7 @@ package com.gemstone.gemfire.management.internal;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.GemFireVersion;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
import com.gemstone.gemfire.internal.logging.LogService;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
index 89cc4f0..efc48fa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
@@ -43,6 +43,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppender;
import com.gemstone.gemfire.internal.logging.log4j.LogWriterAppenders;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
import com.gemstone.gemfire.internal.offheap.OffHeapMemoryStats;
import com.gemstone.gemfire.internal.process.PidUnavailableException;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
index c67a4bc..bcdbdc5 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/LauncherLifecycleCommands.java
@@ -32,7 +32,7 @@ import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
import com.gemstone.gemfire.internal.DistributionLocator;
import com.gemstone.gemfire.internal.GemFireVersion;
import com.gemstone.gemfire.internal.OSProcess;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberPattern;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.lang.ClassUtils;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java b/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
index ba0b479..6e81309 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/memcached/GemFireMemcachedServer.java
@@ -36,7 +36,7 @@ import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.memcached.ConnectionHandler;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
index 81f87d5..00a38db 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/redis/GemFireRedisServer.java
@@ -22,7 +22,7 @@ import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.hll.HyperLogLogPlus;
import com.gemstone.gemfire.internal.redis.*;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
index 3ae6cf6..f959f67 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -61,6 +61,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.Invoke;
@@ -132,6 +133,7 @@ public class ConnectionPoolDUnitTest extends JUnit4CacheTestCase {
}
});
postTearDownConnectionPoolDUnitTest();
+
}
protected void postTearDownConnectionPoolDUnitTest() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
index ee5cc62..c9c04fd 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/CacheServerSSLConnectionDUnitTest.java
@@ -37,6 +37,7 @@ import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
import com.gemstone.gemfire.security.AuthenticationRequiredException;
import com.gemstone.gemfire.test.dunit.Host;
import com.gemstone.gemfire.test.dunit.IgnoredException;
@@ -223,12 +224,14 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
public static void closeCacheTask(){
if (instance != null && instance.cache != null) {
instance.cache.close();
+ SocketCreatorFactory.close();
}
}
public static void closeClientCacheTask(){
if (instance != null && instance.clientCache != null) {
instance.clientCache.close();
+ SocketCreatorFactory.close();
}
}
@@ -370,6 +373,7 @@ public class CacheServerSSLConnectionDUnitTest extends JUnit4DistributedTestCase
VM clientVM = host.getVM(2);
clientVM.invoke(() -> closeClientCacheTask());
serverVM.invoke(() -> closeCacheTask());
+
}
}