You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/07/12 20:01:05 UTC
[6/8] incubator-geode git commit: GEODE-420: Clean up of
SocketCreator code in tests. SocketCreatorFactory currently singleton,
to amend at later stage
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
index de191e8..2c0cde6 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalLocator.java
@@ -34,7 +34,7 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.locator.PeerLoca
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpHandler;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.wan.WANServiceProvider;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
index 7f8eed6..3fe1cca 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/LonerDistributionManager.java
@@ -26,7 +26,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.InternalLogWriter;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocation.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocation.java
index 0e7430c..622f7ee 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocation.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
/**
* Represents the location of a bridge server. This class is
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocator.java
index f5b774f..85db34d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/ServerLocator.java
@@ -53,7 +53,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpHandler;
import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.CacheServerAdvisor.CacheServerProfile;
import com.gemstone.gemfire.internal.cache.ControllerAdvisor;
import com.gemstone.gemfire.internal.cache.ControllerAdvisor.ControllerProfile;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
index 05810e5..ec2dec1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java
@@ -24,6 +24,8 @@ import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributes
import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.net.SocketCreator;
+
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
index f1caa5d..d7b09aa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java
@@ -25,7 +25,7 @@ import com.gemstone.gemfire.distributed.internal.*;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
import com.gemstone.gemfire.i18n.StringId;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.cache.DirectReplyMessage;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
index 2d8b8e1..d24fcf7 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
@@ -28,6 +28,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.internal.*;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import java.io.*;
import java.net.Inet4Address;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
index 4328bed..1406d60 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMemberFactory.java
@@ -24,7 +24,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionException;
import com.gemstone.gemfire.distributed.internal.LocatorStats;
import com.gemstone.gemfire.distributed.internal.membership.*;
import com.gemstone.gemfire.distributed.internal.membership.gms.locator.GMSLocator;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
index 6478c70..b3862e1 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
@@ -26,8 +26,7 @@ import java.util.StringTokenizer;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
public class GMSUtil {
static Logger logger = Services.getLogger();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
index 2a41574..de9a9ff 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
@@ -19,7 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms;
import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
import java.net.InetAddress;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index e0f22a5..7fc8b3a 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -52,6 +52,7 @@ import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
import com.gemstone.gemfire.distributed.internal.DMStats;
import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.NetView;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
@@ -63,8 +64,9 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.Heartbe
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
import com.gemstone.gemfire.internal.ConnectionWatcher;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
/**
* Failure Detection
@@ -493,9 +495,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
*/
boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
Socket clientSocket = null;
+ InternalDistributedSystem internalDistributedSystem = InternalDistributedSystem.getConnectedInstance();
try {
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
- clientSocket = SocketCreator.getDefaultInstance().connect(suspectMember.getInetAddress(), port,
+ clientSocket = SocketCreatorFactory.getClusterSSLSocketCreator().connect(suspectMember.getInetAddress(), port,
(int)memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
clientSocket.setTcpNoDelay(true);
return doTCPCheckMember(suspectMember, clientSocket);
@@ -644,7 +647,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
ServerSocket serverSocket = null;
try {
- serverSocket = SocketCreator.getDefaultInstance().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/,
+ serverSocket = SocketCreatorFactory.getClusterSSLSocketCreator().createServerSocketUsingPortRange(socketAddress, 50/*backlog*/,
true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
socketPort = serverSocket.getLocalPort();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java
index 1f97001..3948c51 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java
@@ -19,7 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import org.jgroups.Global;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.UUID;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index fc81049..07a72db 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.logging.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpClient.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpClient.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpClient.java
index c770238..f27618f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpClient.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpClient.java
@@ -30,54 +30,51 @@ import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.UnsupportedVersionException;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.VersionedDataInputStream;
import com.gemstone.gemfire.internal.VersionedDataOutputStream;
import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
/**
* Client for the TcpServer. These methods were refactored out of GossipClient,
- * because they are required for the server regardess of whether we are using the
+ * because they are required for the server regardless of whether we are using the
* GossipServer or the ServerLocator.
- *
+ * <p>
* TODO - refactor this to support keep-alive connections to the server. requestToServer
* probably shouldn't a static method.
* @since GemFire 5.7
- *
*/
public class TcpClient {
+
private static final Logger logger = LogService.getLogger();
-
private static final int REQUEST_TIMEOUT = 60 * 2 * 1000;
-
private static Map<InetSocketAddress, Short> serverVersions = new HashMap<InetSocketAddress, Short>();
-
/**
* Stops the TcpServer running on a given host and port
*/
public static void stop(InetAddress addr, int port) throws java.net.ConnectException {
try {
- ShutdownRequest request =new ShutdownRequest();
+ ShutdownRequest request = new ShutdownRequest();
TcpClient.requestToServer(addr, port, request, REQUEST_TIMEOUT);
- }
- catch (java.net.ConnectException ce) {
+ } catch (java.net.ConnectException ce) {
// must not be running, rethrow so the caller can handle.
// In most cases this Exception should be ignored.
throw ce;
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
logger.error("TcpClient.stop(): exception connecting to locator " + addr + ":" + port + ": " + ex);
}
}
- /** Contacts the gossip server running on the given host,
- * and port and gets information about it. Two <code>String</code>s
- * are returned: the first string is the working directory of the
- * locator and the second string is the product directory of the
- * locator.
- */
+ /**
+ * Contacts the gossip server running on the given host,
+ * and port and gets information about it. Two <code>String</code>s
+ * are returned: the first string is the working directory of the
+ * locator and the second string is the product directory of the
+ * locator.
+ */
public static String[] getInfo(InetAddress addr, int port) {
try {
InfoRequest request = new InfoRequest();
@@ -85,27 +82,28 @@ public class TcpClient {
return response.getInfo();
} catch (java.net.ConnectException ignore) {
return null;
- } catch(Exception ex) {
+ } catch (Exception ex) {
logger.error("TcpClient.getInfo(): exception connecting to locator " + addr + ":" + port + ": " + ex);
return null;
}
-
+
}
public static Object requestToServer(InetAddress addr, int port, Object request, int timeout) throws IOException, ClassNotFoundException {
return requestToServer(addr, port, request, timeout, true);
}
-
- public static Object requestToServer(InetAddress addr, int port, Object request, int timeout, boolean replyExpected) throws IOException, ClassNotFoundException {
+
+ public static Object requestToServer(InetAddress addr, int port, Object request, int timeout, boolean replyExpected)
+ throws IOException, ClassNotFoundException {
InetSocketAddress ipAddr;
if (addr == null) {
ipAddr = new InetSocketAddress(port);
} else {
ipAddr = new InetSocketAddress(addr, port); // fix for bug 30810
}
-
+
long giveupTime = System.currentTimeMillis() + timeout;
-
+
// Get the GemFire version of the TcpServer first, before sending any other request.
short serverVersion = getServerVersion(ipAddr, timeout).shortValue();
@@ -124,19 +122,19 @@ public class TcpClient {
if (newTimeout <= 0) {
return null;
}
-
+
logger.debug("TcpClient sending {} to {}", request, ipAddr);
- Socket sock=SocketCreator.getDefaultInstance().connect(ipAddr.getAddress(), ipAddr.getPort(), (int)newTimeout, null, false);
- sock.setSoTimeout((int)newTimeout);
+ Socket sock = SocketCreatorFactory.getClusterSSLSocketCreator().connect(ipAddr.getAddress(), ipAddr.getPort(), (int) newTimeout, null, false);
+ sock.setSoTimeout((int) newTimeout);
DataOutputStream out = null;
try {
- out=new DataOutputStream(sock.getOutputStream());
-
+ out = new DataOutputStream(sock.getOutputStream());
+
if (serverVersion < Version.CURRENT_ORDINAL) {
out = new VersionedDataOutputStream(out, Version.fromOrdinalNoThrow(serverVersion, false));
}
-
+
out.writeInt(gossipVersion);
if (gossipVersion > TcpServer.getOldGossipVersion()) {
out.writeShort(serverVersion);
@@ -146,7 +144,7 @@ public class TcpClient {
if (replyExpected) {
DataInputStream in = new DataInputStream(sock.getInputStream());
- in = new VersionedDataInputStream(in, Version.fromOrdinal(serverVersion, false));
+ in = new VersionedDataInputStream(in, Version.fromOrdinal(serverVersion, false));
try {
Object response = DataSerializer.readObject(in);
logger.debug("received response: {}", response);
@@ -154,15 +152,12 @@ public class TcpClient {
} catch (EOFException ex) {
throw new EOFException("Locator at " + ipAddr + " did not respond. This is normal if the locator was shutdown. If it wasn't check its log for exceptions.");
}
- }
- else {
+ } else {
return null;
}
} catch (UnsupportedVersionException ex) {
if (logger.isDebugEnabled()) {
- logger.debug("Remote TcpServer version: " + serverVersion
- + " is higher than local version: " + Version.CURRENT_ORDINAL
- + ". This is never expected as remoteVersion");
+ logger.debug("Remote TcpServer version: " + serverVersion + " is higher than local version: " + Version.CURRENT_ORDINAL + ". This is never expected as remoteVersion");
}
return null;
} finally {
@@ -175,7 +170,7 @@ public class TcpClient {
sock.setSoLinger(true, 0);
}
sock.close();
- } catch(Exception e) {
+ } catch (Exception e) {
logger.error("Error closing socket ", e);
}
if (out != null) {
@@ -190,22 +185,22 @@ public class TcpClient {
Short serverVersion = null;
// Get GemFire version of TcpServer first, before sending any other request.
- synchronized(serverVersions) {
+ synchronized (serverVersions) {
serverVersion = serverVersions.get(ipAddr);
}
if (serverVersion != null) {
return serverVersion;
}
-
+
gossipVersion = TcpServer.getOldGossipVersion();
-
- Socket sock=SocketCreator.getDefaultInstance().connect(ipAddr.getAddress(), ipAddr.getPort(), timeout, null, false);
+
+ Socket sock = SocketCreatorFactory.getClusterSSLSocketCreator().connect(ipAddr.getAddress(), ipAddr.getPort(), timeout, null, false);
sock.setSoTimeout(timeout);
-
+
try {
- DataOutputStream out=new DataOutputStream(sock.getOutputStream());
+ DataOutputStream out = new DataOutputStream(sock.getOutputStream());
out = new VersionedDataOutputStream(out, Version.GFE_57);
-
+
out.writeInt(gossipVersion);
VersionRequest verRequest = new VersionRequest();
@@ -213,12 +208,12 @@ public class TcpClient {
out.flush();
DataInputStream in = new DataInputStream(sock.getInputStream());
- in = new VersionedDataInputStream(in, Version.GFE_57);
+ in = new VersionedDataInputStream(in, Version.GFE_57);
try {
VersionResponse response = DataSerializer.readObject(in);
if (response != null) {
serverVersion = Short.valueOf(response.getVersionOrdinal());
- synchronized(serverVersions) {
+ synchronized (serverVersions) {
serverVersions.put(ipAddr, serverVersion);
}
return serverVersion;
@@ -230,14 +225,14 @@ public class TcpClient {
try {
sock.setSoLinger(true, 0); // initiate an abort on close to shut down the server's socket
sock.close();
- } catch(Exception e) {
+ } catch (Exception e) {
logger.error("Error closing socket ", e);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Locator " + ipAddr + " did not respond to a request for its version. I will assume it is using v5.7 for safety.");
}
- synchronized(serverVersions) {
+ synchronized (serverVersions) {
serverVersions.put(ipAddr, Version.GFE_57.ordinal());
}
return Short.valueOf(Version.GFE_57.ordinal());
@@ -246,9 +241,9 @@ public class TcpClient {
private TcpClient() {
//static class
}
-
+
public static void clearStaticData() {
- synchronized(serverVersions) {
+ synchronized (serverVersions) {
serverVersions.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java
index ceb5af8..cf048bf 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java
@@ -16,45 +16,71 @@
*/
package com.gemstone.gemfire.distributed.internal.tcpserver;
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.distributed.internal.*;
-import com.gemstone.gemfire.internal.*;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-import javax.net.ssl.SSLException;
-import java.io.*;
-import java.net.*;
-import java.util.*;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.StreamCorruptedException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URL;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.net.ssl.SSLException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.PoolStatHelper;
+import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
+import com.gemstone.gemfire.distributed.internal.SharedConfiguration;
+import com.gemstone.gemfire.internal.DSFIDFactory;
+import com.gemstone.gemfire.internal.GemFireVersion;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.VersionedDataInputStream;
+import com.gemstone.gemfire.internal.VersionedDataOutputStream;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.net.SocketCreator;
+import com.gemstone.gemfire.internal.net.SocketCreatorFactory;
/**
* TCP server which listens on a port and delegates requests to a request
* handler. The server uses expects messages containing a global version number,
* followed by a DataSerializable object
- *
+ * <p>
* This code was factored out of GossipServer.java to allow multiple handlers to
* share the same gossip server port.
- *
* @since GemFire 5.7
*/
public class TcpServer {
+
/**
* The version of the tcp server protocol
* <p>
* This should be incremented if the gossip message structures change
- *
+ * <p>
* 1000 - gemfire 5.5 - using java serialization
* 1001 - 5.7 - using DataSerializable and supporting server locator messages.
* 1002 - 7.1 - sending GemFire version along with GOSSIP_VERSION in each request.
- *
+ * <p>
* with the addition of support for all old versions of clients you can no
* longer change this version number
*/
@@ -63,7 +89,7 @@ public class TcpServer {
// This GOSSIPVERSION is used in _getVersionForAddress request for getting GemFire version of a GossipServer.
public final static int OLDGOSSIPVERSION = 1001;
- private static/* GemStoneAddition */final Map GOSSIP_TO_GEMFIRE_VERSION_MAP = new HashMap();
+ private static/* GemStoneAddition */ final Map GOSSIP_TO_GEMFIRE_VERSION_MAP = new HashMap();
// For test purpose only
public static boolean isTesting = false;
@@ -74,11 +100,11 @@ public class TcpServer {
public static final long SHUTDOWN_WAIT_TIME = 60 * 1000;
private static int MAX_POOL_SIZE = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.MAX_POOL_SIZE", 100).intValue();
private static int POOL_IDLE_TIMEOUT = 60 * 1000;
-
+
private static final Logger log = LogService.getLogger();
- protected/*GemStoneAddition*/ final/*GemStoneAddition*/ static int READ_TIMEOUT = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.READ_TIMEOUT", 60 * 1000).intValue();
+ protected/*GemStoneAddition*/ final/*GemStoneAddition*/ static int READ_TIMEOUT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.READ_TIMEOUT", 60 * 1000)
+ .intValue();
//This is for backwards compatibility. The p2p.backlog flag used to be the only way to configure the locator backlog.
private static final int P2P_BACKLOG = Integer.getInteger("p2p.backlog", 1000).intValue();
private static final int BACKLOG = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.BACKLOG", P2P_BACKLOG).intValue();
@@ -90,12 +116,14 @@ public class TcpServer {
private volatile boolean shuttingDown = false; // GemStoneAddition
private final PoolStatHelper poolHelper;
private/*GemStoneAddition*/ final TcpHandler handler;
-
+
private PooledExecutorWithDMStats executor;
private final ThreadGroup threadGroup;
private final String threadName;
private volatile Thread serverThread;
+ private SocketCreator socketCreator;
+
/**
* GemStoneAddition - Initialize versions map.
* Warning: This map must be compatible with all GemFire versions being
@@ -109,8 +137,14 @@ public class TcpServer {
GOSSIP_TO_GEMFIRE_VERSION_MAP.put(OLDGOSSIPVERSION, Version.GFE_57.ordinal());
}
- public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
- DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper, ThreadGroup threadGroup, String threadName) {
+ public TcpServer(int port,
+ InetAddress bind_address,
+ Properties sslConfig,
+ DistributionConfigImpl cfg,
+ TcpHandler handler,
+ PoolStatHelper poolHelper,
+ ThreadGroup threadGroup,
+ String threadName) {
this.port = port;
this.bind_address = bind_address;
this.handler = handler;
@@ -130,20 +164,22 @@ public class TcpServer {
}
cfg = new DistributionConfigImpl(sslConfig);
}
- SocketCreator.getDefaultInstance(cfg);
+
+ //TODO Udo: How would I handle this case where the cfg is empty???
+ this.socketCreator = SocketCreatorFactory.getClusterSSLSocketCreator();
}
private static PooledExecutorWithDMStats createExecutor(PoolStatHelper poolHelper, final ThreadGroup threadGroup) {
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger threadNum = new AtomicInteger();
-
+
public Thread newThread(Runnable r) {
Thread thread = new Thread(threadGroup, r, "locator request thread[" + threadNum.incrementAndGet() + "]");
thread.setDaemon(true);
return thread;
}
};
-
+
return new PooledExecutorWithDMStats(new SynchronousQueue(), MAX_POOL_SIZE, poolHelper, factory, POOL_IDLE_TIMEOUT, new ThreadPoolExecutor.CallerRunsPolicy());
}
@@ -152,22 +188,23 @@ public class TcpServer {
this.handler.restarting(ds, cache, sharedConfig);
startServerThread();
this.executor = createExecutor(this.poolHelper, this.threadGroup);
- this.log.info("TcpServer@"+System.identityHashCode(this)+" restarting: completed. Server thread="+serverThread+"@"+System.identityHashCode(serverThread)+";alive="+serverThread.isAlive());
+ this.log.info("TcpServer@" + System.identityHashCode(this) + " restarting: completed. Server thread=" + serverThread + "@" + System.identityHashCode(serverThread) + ";alive=" + serverThread
+ .isAlive());
}
-
+
public void start() throws IOException {
this.shuttingDown = false;
startServerThread();
handler.init(this);
}
-
+
private void startServerThread() throws IOException {
if (srv_sock == null || srv_sock.isClosed()) {
if (bind_address == null) {
- srv_sock = SocketCreator.getDefaultInstance().createServerSocket(port, BACKLOG);
+ srv_sock = socketCreator.createServerSocket(port, BACKLOG);
bind_address = srv_sock.getInetAddress();
} else {
- srv_sock = SocketCreator.getDefaultInstance().createServerSocket(port, BACKLOG, bind_address);
+ srv_sock = socketCreator.createServerSocket(port, BACKLOG, bind_address);
}
if (log.isInfoEnabled()) {
@@ -187,36 +224,35 @@ public class TcpServer {
serverThread.start();
}
}
-
+
public void join(long millis) throws InterruptedException {
- if(serverThread != null) {
+ if (serverThread != null) {
serverThread.join(millis);
}
}
-
+
public void join() throws InterruptedException {
-// this.log.info("TcpServer@"+System.identityHashCode(this)+" join() invoked. Server thread="+serverThread+"@"+System.identityHashCode(serverThread)+";alive="+serverThread.isAlive());
- if(serverThread != null) {
+ // this.log.info("TcpServer@"+System.identityHashCode(this)+" join() invoked. Server thread="+serverThread+"@"+System.identityHashCode(serverThread)+";alive="+serverThread.isAlive());
+ if (serverThread != null) {
serverThread.join();
}
}
-
+
public boolean isAlive() {
return serverThread != null && serverThread.isAlive();
}
-
+
public boolean isShuttingDown() {
return this.shuttingDown;
}
-
+
public SocketAddress getBindAddress() {
- return srv_sock.getLocalSocketAddress();
+ return srv_sock.getLocalSocketAddress();
}
/**
* Returns the value of the bound port. If the server was initialized with a port of 0 indicating that any
* ephemeral port should be used, this method will return the actual bound port.
- *
* @return the port bound to this socket or 0 if the socket is closed or otherwise not connected
*/
public int getPort() {
@@ -265,8 +301,7 @@ public class TcpServer {
srv_sock.close();
} catch (java.io.IOException ex) {
- log.warn(
- "exception closing server socket during shutdown", ex);
+ log.warn("exception closing server socket during shutdown", ex);
}
if (shuttingDown) {
@@ -279,7 +314,7 @@ public class TcpServer {
}
handler.shutDown();
synchronized (this) {
-// this.shutDown = true;
+ // this.shutDown = true;
this.notifyAll();
}
}
@@ -297,7 +332,7 @@ public class TcpServer {
DataInputStream input = null;
Object request, response;
try {
- SocketCreator.getDefaultInstance().configureServerSSLSocket(sock);
+ socketCreator.configureServerSSLSocket(sock);
// if(log.isInfoEnabled()) log.info("accepted connection from " +
// sock.getInetAddress() +
// ':' + sock.getPort());
@@ -308,30 +343,25 @@ public class TcpServer {
} catch (StreamCorruptedException e) {
// bug 36679: Some garbage can be left on the socket stream
// if a peer disappears at exactly the wrong moment.
- log.debug(
- "Discarding illegal request from "
- + (sock.getInetAddress().getHostAddress() + ":" + sock
- .getPort()), e);
+ log.debug("Discarding illegal request from " + (sock.getInetAddress().getHostAddress() + ":" + sock.getPort()), e);
return;
}
int gossipVersion = input.readInt();
short versionOrdinal = Version.CURRENT_ORDINAL;
// Create a versioned stream to remember sender's GemFire version
- if (gossipVersion <= getCurrentGossipVersion()
- && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
- versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP
- .get(gossipVersion);
-// if (gossipVersion < getCurrentGossipVersion()) {
-// if (log.isTraceEnabled()) {
-// log.debug(
-// "Received request from "
-// + sock.getInetAddress().getHostAddress()
-// + " This locator is running: " + getCurrentGossipVersion()
-// + ", but request was version: " + gossipVersion
-// + ", version ordinal: " + versionOrdinal);
-// }
-// }
+ if (gossipVersion <= getCurrentGossipVersion() && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
+ versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
+ // if (gossipVersion < getCurrentGossipVersion()) {
+ // if (log.isTraceEnabled()) {
+ // log.debug(
+ // "Received request from "
+ // + sock.getInetAddress().getHostAddress()
+ // + " This locator is running: " + getCurrentGossipVersion()
+ // + ", but request was version: " + gossipVersion
+ // + ", version ordinal: " + versionOrdinal);
+ // }
+ // }
}
// Close the socket. We can not accept requests from a newer version
else {
@@ -346,8 +376,7 @@ public class TcpServer {
if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
log.debug("Locator reading request from " + sock.getInetAddress() + " with version " + Version.fromOrdinal(versionOrdinal, false));
}
- input = new VersionedDataInputStream(input, Version.fromOrdinal(
- versionOrdinal, false));
+ input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false));
request = DataSerializer.readObject(input);
if (log.isDebugEnabled()) {
log.debug("Locator received request " + request + " from " + sock.getInetAddress());
@@ -369,7 +398,7 @@ public class TcpServer {
}
handler.endRequest(request, startTime);
-
+
startTime = DistributionStats.getStatTime();
if (response != null) {
DataOutputStream output = new DataOutputStream(sock.getOutputStream());
@@ -381,7 +410,7 @@ public class TcpServer {
output.flush();
}
- handler.endResponse(request,startTime);
+ handler.endResponse(request, startTime);
} catch (EOFException ex) {
// client went away - ignore
@@ -392,24 +421,20 @@ public class TcpServer {
if (sock != null) {
sender = sock.getInetAddress().getHostAddress();
}
- log.info(
- "Unable to process request from " + sender + " exception=" + ex.getMessage());
+ log.info("Unable to process request from " + sender + " exception=" + ex.getMessage());
} catch (Exception ex) {
String sender = null;
if (sock != null) {
sender = sock.getInetAddress().getHostAddress();
}
- if(ex instanceof IOException) {
+ if (ex instanceof IOException) {
//IOException could be caused by a client failure. Don't
//log with severe.
if (!sock.isClosed()) {
- log.info(
- "Exception in processing request from " + sender, ex);
+ log.info("Exception in processing request from " + sender, ex);
}
- }
- else {
- log.fatal("Exception in processing request from " +
- sender, ex);
+ } else {
+ log.fatal("Exception in processing request from " + sender, ex);
}
} catch (VirtualMachineError err) {
@@ -429,8 +454,7 @@ public class TcpServer {
sender = sock.getInetAddress().getHostAddress();
}
try {
- log.fatal("Exception in processing request from " +
- sender, ex);
+ log.fatal("Exception in processing request from " + sender, ex);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
@@ -484,8 +508,8 @@ public class TcpServer {
/**
* Returns GossipVersion for older Gemfire versions.
- *
* @param ordinal
+ *
* @return gossip version
*/
public static int getGossipVersionForOrdinal(short ordinal) {
@@ -498,12 +522,12 @@ public class TcpServer {
Iterator<Map.Entry> itr = TcpServer.GOSSIP_TO_GEMFIRE_VERSION_MAP.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry entry = itr.next();
- short o = ((Short)entry.getValue()).shortValue();
+ short o = ((Short) entry.getValue()).shortValue();
if (o == ordinal) {
- return ((Integer)entry.getKey()).intValue();
- } else if (o < ordinal && o > closest ) {
+ return ((Integer) entry.getKey()).intValue();
+ } else if (o < ordinal && o > closest) {
closest = o;
- closestGV = ((Integer)entry.getKey()).intValue();
+ closestGV = ((Integer) entry.getKey()).intValue();
}
}
}
@@ -512,13 +536,11 @@ public class TcpServer {
}
public static int getCurrentGossipVersion() {
- return TcpServer.isTesting ? TcpServer.TESTVERSION
- : TcpServer.GOSSIPVERSION;
+ return TcpServer.isTesting ? TcpServer.TESTVERSION : TcpServer.GOSSIPVERSION;
}
public static int getOldGossipVersion() {
- return TcpServer.isTesting ? TcpServer.OLDTESTVERSION
- : TcpServer.OLDGOSSIPVERSION;
+ return TcpServer.isTesting ? TcpServer.OLDTESTVERSION : TcpServer.OLDGOSSIPVERSION;
}
public static Map getGossipVersionMapForTestOnly() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/AbstractConfig.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/AbstractConfig.java
index da4209b..812e307 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/AbstractConfig.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/AbstractConfig.java
@@ -17,25 +17,41 @@
package com.gemstone.gemfire.internal;
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Array;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
+
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.UnmodifiableException;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.FlowControlParams;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.net.SocketCreator;
-import java.io.*;
-import java.lang.reflect.Array;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
/**
* Provides an implementation of the {@link Config} interface
* that implements functionality that all {@link Config} implementations
* can share.
*/
public abstract class AbstractConfig implements Config {
+
/**
* Returns the string to use as the exception message when an attempt
* is made to set an unmodifiable attribute.
@@ -50,8 +66,9 @@ public abstract class AbstractConfig implements Config {
protected abstract Map getAttDescMap();
protected abstract Map<String, ConfigSource> getAttSourceMap();
-
+
public final static String sourceHeader = "PropertiesSourceHeader";
+
/**
* Set to true if most of the attributes can be modified.
* Set to false if most of the attributes are read only.
@@ -59,9 +76,9 @@ public abstract class AbstractConfig implements Config {
protected boolean _modifiableDefault() {
return false;
}
-
+
/**
- * Use {@link #toLoggerString()} instead. If you need to override this in a
+ * Use {@link #toLoggerString()} instead. If you need to override this in a
* subclass, be careful not to expose any private data or security related
* values. Fixing bug #48155 by not exposing all values.
*/
@@ -69,7 +86,7 @@ public abstract class AbstractConfig implements Config {
public final String toString() {
return getClass().getName() + "@" + Integer.toHexString(hashCode());
}
-
+
@Override
public String toLoggerString() {
StringWriter sw = new StringWriter();
@@ -77,7 +94,7 @@ public abstract class AbstractConfig implements Config {
printSourceSection(ConfigSource.runtime(), pw);
printSourceSection(ConfigSource.sysprop(), pw);
printSourceSection(ConfigSource.api(), pw);
- for (ConfigSource fileSource: getFileSources()) {
+ for (ConfigSource fileSource : getFileSources()) {
printSourceSection(fileSource, pw);
}
printSourceSection(ConfigSource.xml(), pw);
@@ -86,18 +103,19 @@ public abstract class AbstractConfig implements Config {
pw.close();
return sw.toString();
}
-
+
/***
* Gets the Map of GemFire properties and values from a given ConfigSource
* @param source
+ *
* @return map of GemFire properties and values
*/
public Map<String, String> getConfigPropsFromSource(ConfigSource source) {
Map<String, String> configProps = new HashMap<String, String>();
String[] validAttributeNames = getAttributeNames();
Map<String, ConfigSource> sm = getAttSourceMap();
-
- for (int i=0; i < validAttributeNames.length; i++) {
+
+ for (int i = 0; i < validAttributeNames.length; i++) {
String attName = validAttributeNames[i];
if (source == null) {
if (sm.get(attName) != null) {
@@ -110,22 +128,22 @@ public abstract class AbstractConfig implements Config {
}
return configProps;
}
-
+
/****
- * Gets all the GemFire properties defined using file(s)
+ * Gets all the GemFire properties defined using file(s)
* @return Map of GemFire properties and values set using property files
*/
public Map<String, String> getConfigPropsDefinedUsingFiles() {
Map<String, String> configProps = new HashMap<String, String>();
- for (ConfigSource fileSource: getFileSources()) {
+ for (ConfigSource fileSource : getFileSources()) {
configProps.putAll(getConfigPropsFromSource(fileSource));
}
return configProps;
}
-
+
private List<ConfigSource> getFileSources() {
ArrayList<ConfigSource> result = new ArrayList<ConfigSource>();
- for (ConfigSource cs: getAttSourceMap().values()) {
+ for (ConfigSource cs : getAttSourceMap().values()) {
if (cs.getType() == ConfigSource.Type.FILE || cs.getType() == ConfigSource.Type.SECURE_FILE) {
if (!result.contains(cs)) {
result.add(cs);
@@ -143,7 +161,7 @@ public abstract class AbstractConfig implements Config {
if (source != null && source.getType() == ConfigSource.Type.SECURE_FILE) {
secureSource = true;
}
- for (int i=0; i < validAttributeNames.length; i++) {
+ for (int i = 0; i < validAttributeNames.length; i++) {
String attName = validAttributeNames[i];
if (source == null) {
if (sm.get(attName) != null) {
@@ -161,7 +179,7 @@ public abstract class AbstractConfig implements Config {
}
}
// hide the shiro-init configuration for now. Remove after we can allow customer to specify shiro.ini file
- if(attName.equals(SECURITY_SHIRO_INIT)){
+ if (attName.equals(SECURITY_SHIRO_INIT)) {
continue;
}
pw.print(attName);
@@ -175,7 +193,7 @@ public abstract class AbstractConfig implements Config {
}
}
}
-
+
private boolean okToDisplayPropertyValue(String attName) {
if (attName.startsWith(SECURITY_PREFIX)) {
return false;
@@ -191,14 +209,16 @@ public abstract class AbstractConfig implements Config {
}
return true;
}
-
+
/**
* This class was added to fix bug 39382.
* It does this be overriding "keys" which is used by the store0
* implementation of Properties.
*/
protected static class SortedProperties extends Properties {
+
private static final long serialVersionUID = 7156507110684631135L;
+
@Override
public Enumeration keys() {
// the TreeSet gets the sorting we desire but is only safe
@@ -206,15 +226,15 @@ public abstract class AbstractConfig implements Config {
return Collections.enumeration(new TreeSet(keySet()));
}
}
-
+
public boolean isDeprecated(String attName) {
return false;
}
-
+
public Properties toProperties() {
Properties result = new SortedProperties();
String[] attNames = getAttributeNames();
- for (int i=0; i < attNames.length; i++) {
+ for (int i = 0; i < attNames.length; i++) {
if (isDeprecated(attNames[i])) {
continue;
}
@@ -222,12 +242,12 @@ public abstract class AbstractConfig implements Config {
}
return result;
}
+
public void toFile(File f) throws IOException {
FileOutputStream out = new FileOutputStream(f);
try {
toProperties().store(out, null);
- }
- finally {
+ } finally {
out.close();
}
}
@@ -243,7 +263,7 @@ public abstract class AbstractConfig implements Config {
return false;
}
String[] validAttributeNames = getAttributeNames();
- for (int i=0; i < validAttributeNames.length; i++) {
+ for (int i = 0; i < validAttributeNames.length; i++) {
String attName = validAttributeNames[i];
if (this.isDeprecated(attName)) {
// since toProperties skips isDeprecated sameAs
@@ -263,7 +283,7 @@ public abstract class AbstractConfig implements Config {
if (thisLength != otherLength) {
return false;
}
- for (int j=0; j < thisLength; j++) {
+ for (int j = 0; j < thisLength; j++) {
Object thisArrObj = Array.get(thisAtt, j);
Object otherArrObj = Array.get(otherAtt, j);
if (thisArrObj == otherArrObj) {
@@ -284,27 +304,30 @@ public abstract class AbstractConfig implements Config {
protected void checkAttributeName(String attName) {
String[] validAttNames = getAttributeNames();
if (!Arrays.asList(validAttNames).contains(attName.toLowerCase())) {
- throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_UNKNOWN_CONFIGURATION_ATTRIBUTE_NAME_0_VALID_ATTRIBUTE_NAMES_ARE_1.toLocalizedString(new Object[] {attName, SystemAdmin.join(validAttNames)}));
+ throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_UNKNOWN_CONFIGURATION_ATTRIBUTE_NAME_0_VALID_ATTRIBUTE_NAMES_ARE_1.toLocalizedString(new Object[] {
+ attName,
+ SystemAdmin.join(validAttNames)
+ }));
}
}
-
+
public String getAttribute(String attName) {
Object result = getAttributeObject(attName);
if (result instanceof String) {
- return (String)result;
+ return (String) result;
}
if (attName.equalsIgnoreCase(MEMBERSHIP_PORT_RANGE)) {
- int[] value = (int[])result;
- return ""+value[0]+"-"+value[1];
+ int[] value = (int[]) result;
+ return "" + value[0] + "-" + value[1];
}
if (result.getClass().isArray()) {
- return SystemAdmin.join((Object[])result);
+ return SystemAdmin.join((Object[]) result);
}
if (result instanceof InetAddress) {
- InetAddress addr = (InetAddress)result;
+ InetAddress addr = (InetAddress) result;
String addrName = null;
if (addr.isMulticastAddress() || !SocketCreator.resolve_dns) {
addrName = addr.getHostAddress(); // on Windows getHostName on mcast addrs takes ~5 seconds
@@ -327,6 +350,8 @@ public abstract class AbstractConfig implements Config {
try {
if (valueType.equals(String.class)) {
attObjectValue = attValue;
+ } else if (valueType.equals(String[].class)) {
+ attObjectValue = commaDelimitedStringToStringArray(attValue);
} else if (valueType.equals(Integer.class)) {
attObjectValue = Integer.valueOf(attValue);
} else if (valueType.equals(Long.class)) {
@@ -342,58 +367,81 @@ public abstract class AbstractConfig implements Config {
throw new IllegalArgumentException("expected a setting in the form X-Y but found no dash for attribute " + attName);
}
value[0] = Integer.valueOf(attValue.substring(0, minus)).intValue();
- value[1] = Integer.valueOf(attValue.substring(minus+1)).intValue();
+ value[1] = Integer.valueOf(attValue.substring(minus + 1)).intValue();
attObjectValue = value;
} else if (valueType.equals(InetAddress.class)) {
try {
attObjectValue = InetAddress.getByName(attValue);
} catch (UnknownHostException ex) {
- throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_BE_A_VALID_HOST_NAME_2.toLocalizedString(new Object[] {attName, attValue, ex.toString()}));
+ throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_BE_A_VALID_HOST_NAME_2.toLocalizedString(new Object[] {
+ attName,
+ attValue,
+ ex.toString()
+ }));
}
} else if (valueType.equals(String[].class)) {
if (attValue == null || attValue.length() == 0) {
attObjectValue = null;
} else {
String trimAttName = trimAttributeName(attName);
- throw new UnmodifiableException(LocalizedStrings.AbstractConfig_THE_0_CONFIGURATION_ATTRIBUTE_CAN_NOT_BE_SET_FROM_THE_COMMAND_LINE_SET_1_FOR_EACH_INDIVIDUAL_PARAMETER_INSTEAD.toLocalizedString(new Object[] {attName, trimAttName}));
+ throw new UnmodifiableException(LocalizedStrings.AbstractConfig_THE_0_CONFIGURATION_ATTRIBUTE_CAN_NOT_BE_SET_FROM_THE_COMMAND_LINE_SET_1_FOR_EACH_INDIVIDUAL_PARAMETER_INSTEAD
+ .toLocalizedString(new Object[] { attName, trimAttName }));
}
} else if (valueType.equals(FlowControlParams.class)) {
String values[] = attValue.split(",");
if (values.length != 3) {
- throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_HAVE_THREE_ELEMENTS_SEPARATED_BY_COMMAS.toLocalizedString(new Object[] {attName, attValue}));
+ throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_HAVE_THREE_ELEMENTS_SEPARATED_BY_COMMAS.toLocalizedString(new Object[] {
+ attName,
+ attValue
+ }));
}
int credits = 0;
- float thresh = (float)0.0;
+ float thresh = (float) 0.0;
int waittime = 0;
try {
credits = Integer.parseInt(values[0].trim());
- thresh = Float.valueOf(values[1].trim()).floatValue();
+ thresh = Float.valueOf(values[1].trim()).floatValue();
waittime = Integer.parseInt(values[2].trim());
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_BE_COMPOSED_OF_AN_INTEGER_A_FLOAT_AND_AN_INTEGER.toLocalizedString(new Object[] {attName, attValue}));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_BE_COMPOSED_OF_AN_INTEGER_A_FLOAT_AND_AN_INTEGER.toLocalizedString(new Object[] {
+ attName,
+ attValue
+ }));
}
attObjectValue = new FlowControlParams(credits, thresh, waittime);
} else {
- throw new InternalGemFireException(LocalizedStrings.AbstractConfig_UNHANDLED_ATTRIBUTE_TYPE_0_FOR_1.toLocalizedString(new Object[] {valueType, attName}));
+ throw new InternalGemFireException(LocalizedStrings.AbstractConfig_UNHANDLED_ATTRIBUTE_TYPE_0_FOR_1.toLocalizedString(new Object[] {
+ valueType,
+ attName
+ }));
}
} catch (NumberFormatException ex) {
- throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_BE_A_NUMBER.toLocalizedString(new Object[] {attName, attValue}));
+ throw new IllegalArgumentException(LocalizedStrings.AbstractConfig_0_VALUE_1_MUST_BE_A_NUMBER.toLocalizedString(new Object[] { attName, attValue }));
}
setAttributeObject(attName, attObjectValue, source);
}
+ private String[] commaDelimitedStringToStringArray(final String tokenizeString) {
+ StringTokenizer stringTokenizer = new StringTokenizer(tokenizeString, ",");
+ String[] strings = new String[stringTokenizer.countTokens()];
+ for (int i = 0; i < strings.length; i++) {
+ strings[i] = stringTokenizer.nextToken();
+ }
+ return strings;
+ }
+
/**
* Removes the last character of the input string and returns the trimmed name
*/
protected static String trimAttributeName(String attName) {
- return attName.substring(0, attName.length()-1);
+ return attName.substring(0, attName.length() - 1);
}
+
public String getAttributeDescription(String attName) {
checkAttributeName(attName);
if (!getAttDescMap().containsKey(attName)) {
throw new InternalGemFireException(LocalizedStrings.AbstractConfig_UNHANDLED_ATTRIBUTE_NAME_0.toLocalizedString(attName));
}
- return (String)getAttDescMap().get(attName);
+ return (String) getAttDescMap().get(attName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
index 95ca22b..3bd9be4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/AvailablePort.java
@@ -18,6 +18,7 @@ package com.gemstone.gemfire.internal;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import java.io.IOException;
import java.io.PrintStream;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/GemFireVersion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/GemFireVersion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/GemFireVersion.java
index baf03b1..95a8fdc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/GemFireVersion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/GemFireVersion.java
@@ -18,6 +18,7 @@ package com.gemstone.gemfire.internal;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import java.io.*;
import java.net.*;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatHelper.java
index 7ff0b1b..b46bae4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatHelper.java
@@ -19,6 +19,8 @@ package com.gemstone.gemfire.internal;
import com.gemstone.gemfire.*;
//import com.gemstone.gemfire.util.*;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.net.SocketCreator;
+
import java.net.InetAddress;
import java.net.UnknownHostException;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatSampler.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatSampler.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatSampler.java
index 38fc83d..1be3257 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatSampler.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/HostStatSampler.java
@@ -26,6 +26,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import com.gemstone.gemfire.internal.statistics.CallbackSampler;
import com.gemstone.gemfire.internal.statistics.SampleCollector;
import com.gemstone.gemfire.internal.statistics.StatArchiveHandlerConfig;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/MigrationClient.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/MigrationClient.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/MigrationClient.java
index 2c81e90..6a0a76b 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/MigrationClient.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/MigrationClient.java
@@ -25,6 +25,7 @@ import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.net.SocketCreator;
import java.io.*;
import java.net.InetAddress;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/399a6387/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
deleted file mode 100644
index 8318afa..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
-
-/**
- * This class allows sockets to be closed without blocking.
- * In some cases we have seen a call of socket.close block for minutes.
- * This class maintains a thread pool for every other member we have
- * connected sockets to. Any request to close by default returns immediately
- * to the caller while the close is called by a background thread.
- * The requester can wait for a configured amount of time by setting
- * the "p2p.ASYNC_CLOSE_WAIT_MILLISECONDS" system property.
- * Idle threads that are not doing a close will timeout after 2 minutes.
- * This can be configured by setting the
- * "p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS" system property.
- * A pool exists for each remote address that we have a socket connected to.
- * That way if close is taking a long time to one address we can still get closes
- * done to another address.
- * Each address pool by default has at most 8 threads. This max threads can be
- * configured using the "p2p.ASYNC_CLOSE_POOL_MAX_THREADS" system property.
- */
-public class SocketCloser {
- private static final Logger logger = LogService.getLogger();
- /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
- static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
- /** Maximum number of threads that can be doing a socket close. Any close requests over this max will queue up waiting for a thread. */
- static final int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
- /** How many milliseconds the synchronous requester waits for the async close to happen. Default is 0. Prior releases waited 50ms. */
- static final long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
-
-
- /** map of thread pools of async close threads */
- private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
- private final long asyncClosePoolKeepAliveSeconds;
- private final int asyncClosePoolMaxThreads;
- private final long asyncCloseWaitTime;
- private final TimeUnit asyncCloseWaitUnits;
- private boolean closed;
-
- public SocketCloser() {
- this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_WAIT_MILLISECONDS, TimeUnit.MILLISECONDS);
- }
- public SocketCloser(int asyncClosePoolMaxThreads, long asyncCloseWaitMillis) {
- this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, asyncClosePoolMaxThreads, asyncCloseWaitMillis, TimeUnit.MILLISECONDS);
- }
- public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads, long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
- this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
- this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
- this.asyncCloseWaitTime = asyncCloseWaitTime;
- this.asyncCloseWaitUnits = asyncCloseWaitUnits;
- }
-
- public int getMaxThreads() {
- return this.asyncClosePoolMaxThreads;
- }
-
- private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
- synchronized (asyncCloseExecutors) {
- ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
- if (pool == null) {
- final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
- ThreadFactory tf = new ThreadFactory() {
- public Thread newThread(final Runnable command) {
- Thread thread = new Thread(tg, command);
- thread.setDaemon(true);
- return thread;
- }
- };
- BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
- pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
- pool.allowCoreThreadTimeOut(true);
- asyncCloseExecutors.put(address, pool);
- }
- return pool;
- }
- }
- /**
- * Call this method if you know all the resources in the closer
- * for the given address are no longer needed.
- * Currently a thread pool is kept for each address and if you
- * know that an address no longer needs its pool then you should
- * call this method.
- */
- public void releaseResourcesForAddress(String address) {
- synchronized (asyncCloseExecutors) {
- ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
- if (pool != null) {
- pool.shutdown();
- asyncCloseExecutors.remove(address);
- }
- }
- }
- private boolean isClosed() {
- synchronized (asyncCloseExecutors) {
- return this.closed;
- }
- }
- /**
- * Call close when you are all done with your socket closer.
- * If you call asyncClose after close is called then the
- * asyncClose will be done synchronously.
- */
- public void close() {
- synchronized (asyncCloseExecutors) {
- if (!this.closed) {
- this.closed = true;
- for (ThreadPoolExecutor pool: asyncCloseExecutors.values()) {
- pool.shutdown();
- }
- asyncCloseExecutors.clear();
- }
- }
- }
- private void asyncExecute(String address, Runnable r) {
- // Waiting 50ms for the async close request to complete is what the old (close per thread)
- // code did. But now that we will not create a thread for every close request
- // it seems better to let the thread that requested the close to move on quickly.
- // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
- // can be set to how many milliseconds to wait.
- if (this.asyncCloseWaitTime == 0) {
- getAsyncThreadExecutor(address).execute(r);
- } else {
- Future<?> future = getAsyncThreadExecutor(address).submit(r);
- try {
- future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- // We want this code to wait at most 50ms for the close to happen.
- // It is ok to ignore these exception and let the close continue
- // in the background.
- }
- }
- }
- /**
- * Closes the specified socket in a background thread.
- * In some cases we see close hang (see bug 33665).
- * Depending on how the SocketCloser is configured (see ASYNC_CLOSE_WAIT_MILLISECONDS)
- * this method may block for a certain amount of time.
- * If it is called after the SocketCloser is closed then a normal
- * synchronous close is done.
- * @param sock the socket to close
- * @param address identifies who the socket is connected to
- * @param extra an optional Runnable with stuff to execute in the async thread
- */
- public void asyncClose(final Socket sock, final String address, final Runnable extra) {
- if (sock == null || sock.isClosed()) {
- return;
- }
- boolean doItInline = false;
- try {
- synchronized (asyncCloseExecutors) {
- if (isClosed()) {
- // this SocketCloser has been closed so do a synchronous, inline, close
- doItInline = true;
- } else {
- asyncExecute(address, new Runnable() {
- public void run() {
- Thread.currentThread().setName("AsyncSocketCloser for " + address);
- try {
- if (extra != null) {
- extra.run();
- }
- inlineClose(sock);
- } finally {
- Thread.currentThread().setName("unused AsyncSocketCloser");
- }
- }
- });
- }
- }
- } catch (OutOfMemoryError ignore) {
- // If we can't start a thread to close the socket just do it inline.
- // See bug 50573.
- doItInline = true;
- }
- if (doItInline) {
- if (extra != null) {
- extra.run();
- }
- inlineClose(sock);
- }
- }
-
-
- /**
- * Closes the specified socket
- * @param sock the socket to close
- */
- private static void inlineClose(final Socket sock) {
- // the next two statements are a mad attempt to fix bug
- // 36041 - segv in jrockit in pthread signaling code. This
- // seems to alleviate the problem.
- try {
- sock.shutdownInput();
- sock.shutdownOutput();
- } catch (Exception e) {
- }
- try {
- sock.close();
- } catch (IOException ignore) {
- } catch (VirtualMachineError err) {
- SystemFailure.initiateFailure(err);
- // If this ever returns, rethrow the error. We're poisoned
- // now, so don't let this thread continue.
- throw err;
- } catch (java.security.ProviderException pe) {
- // some ssl implementations have trouble with termination and throw
- // this exception. See bug #40783
- } catch (Error e) {
- // Whenever you catch Error or Throwable, you must also
- // catch VirtualMachineError (see above). However, there is
- // _still_ a possibility that you are dealing with a cascading
- // error condition, so you also need to check to see if the JVM
- // is still usable:
- SystemFailure.checkFailure();
- // Sun's NIO implementation has been known to throw Errors
- // that are caused by IOExceptions. If this is the case, it's
- // okay.
- if (e.getCause() instanceof IOException) {
- // okay...
- } else {
- throw e;
- }
- }
- }
-}