You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:19 UTC
[14/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
new file mode 100644
index 0000000..eca73fc
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
@@ -0,0 +1,326 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.NoRouteToHostException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.SocketUtils;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * A single client to server connection.
+ *
+ * The execute method of this class is synchronized to
+ * prevent two ops from using the client to server connection
+ * at the same time.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class ConnectionImpl implements Connection {
+
+ private static Logger logger = LogService.getLogger();
+
+ /**Test hook to simulate a client crashing. If true, we will
+ * not notify the server when we close the connection.
+ */
+ private static boolean TEST_DURABLE_CLIENT_CRASH = false;
+
+ private Socket theSocket;
+ private ByteBuffer commBuffer;
+ private ByteBuffer commBufferForAsyncRead;
+// private int handShakeTimeout = AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS;
+ private ServerQueueStatus status;
+ private volatile boolean connectFinished;
+ private final AtomicBoolean destroyed = new AtomicBoolean();
+ private Endpoint endpoint;
+ private short wanSiteVersion = -1;//In Gateway communication version of connected wan site
+ //will be stored after successful handshake
+// private final CancelCriterion cancelCriterion;
+ private final DistributedSystem ds;
+
+ private OutputStream out;
+ private InputStream in;
+
+ private long connectionID = Connection.DEFAULT_CONNECTION_ID;
+
+ private HandShake handShake;
+
+ public ConnectionImpl(DistributedSystem ds, CancelCriterion cancelCriterion) {
+// this.cancelCriterion = cancelCriterion;
+ this.ds = ds;
+ }
+
+ public ServerQueueStatus connect(EndpointManager endpointManager,
+ ServerLocation location, HandShake handShake, int socketBufferSize,
+ int handShakeTimeout, int readTimeout, byte communicationMode, GatewaySender sender)
+ throws IOException {
+ SocketCreator sc = SocketCreator.getDefaultInstance();
+ DistributionConfig config = InternalDistributedSystem.getConnectedInstance().getConfig();
+ if (communicationMode == Acceptor.GATEWAY_TO_GATEWAY) {
+ sc = SocketCreator.createNonDefaultInstance(config.getGatewaySSLEnabled(),
+ config.getGatewaySSLRequireAuthentication(), config.getGatewaySSLProtocols(),
+ config.getGatewaySSLCiphers(), config.getGatewaySSLProperties());
+ if (sender!= null && !sender.getGatewayTransportFilters().isEmpty()) {
+ sc.initializeTransportFilterClientSocketFactory(sender);
+ }
+ } else {
+ //If configured use SSL properties for cache-server
+ sc = SocketCreator.createNonDefaultInstance(config.getServerSSLEnabled(),
+ config.getServerSSLRequireAuthentication(),
+ config.getServerSSLProtocols(),
+ config.getServerSSLCiphers(),
+ config.getServerSSLProperties());
+ }
+ if (!sc
+ .isHostReachable(InetAddress.getByName(location.getHostName()))) {
+ throw new NoRouteToHostException("Server is not reachable: " + location.getHostName());
+ }
+ theSocket = sc.connectForClient(
+ location.getHostName(), location.getPort(), handShakeTimeout, socketBufferSize);
+ theSocket.setTcpNoDelay(true);
+ //System.out.println("ConnectionImpl setting buffer sizes: " +
+ // socketBufferSize);
+ theSocket.setSendBufferSize(socketBufferSize);
+
+ // Verify buffer sizes
+ verifySocketBufferSize(socketBufferSize, theSocket.getReceiveBufferSize(), "receive");
+ verifySocketBufferSize(socketBufferSize, theSocket.getSendBufferSize(), "send");
+
+ theSocket.setSoTimeout(handShakeTimeout);
+ out = SocketUtils.getOutputStream(theSocket);//theSocket.getOutputStream();
+ in = SocketUtils.getInputStream(theSocket);//theSocket.getInputStream();
+ this.status = handShake.greet(this, location, communicationMode);
+ commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize);
+ if (sender != null) {
+ commBufferForAsyncRead = ServerConnection
+ .allocateCommBuffer(socketBufferSize);
+ }
+ theSocket.setSoTimeout(readTimeout);
+ endpoint = endpointManager.referenceEndpoint(location, this.status.getMemberId());
+ //logger.warning("ESTABLISHING ENDPOINT:"+location+" MEMBERID:"+endpoint.getMemberId(),new Exception());
+ this.connectFinished = true;
+ this.endpoint.getStats().incConnections(1);
+ return status;
+ }
+
+ public void close(boolean keepAlive) throws Exception {
+
+ try {
+ // if a forced-disconnect has occurred, we can't send messages to anyone
+ SocketCreator sc = SocketCreator.getDefaultInstance();
+ if (!sc.isHostReachable(this.theSocket.getInetAddress())) {
+ return;
+ }
+
+ boolean sendCloseMsg = !TEST_DURABLE_CLIENT_CRASH;
+ if (sendCloseMsg) {
+ try {
+ ((InternalDistributedSystem)ds).getDistributionManager();
+ }
+ catch (CancelException e) { // distribution has stopped
+ Throwable t = e.getCause();
+ if (t instanceof ForcedDisconnectException) {
+ // we're crashing - don't attempt to send a message (bug 39317)
+ sendCloseMsg = false;
+ }
+ }
+ }
+
+ if (sendCloseMsg) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing connection {} with keepAlive: {}", this, keepAlive);
+ }
+ CloseConnectionOp.execute(this, keepAlive);
+ }
+ }
+ finally {
+ destroy();
+ }
+ }
+
+ public void emergencyClose() {
+ commBuffer = null;
+ try {
+ theSocket.close();
+ } catch (IOException e) {
+ //ignore
+ } catch (RuntimeException e) {
+ //ignore
+ }
+ }
+
+ public boolean isDestroyed() {
+ return this.destroyed.get();
+ }
+
+ public void destroy() {
+ if (!this.destroyed.compareAndSet(false, true)) {
+ // was already set to true so someone else did the destroy
+ return;
+ }
+
+ if (endpoint != null) {
+ if (this.connectFinished) {
+ endpoint.getStats().incConnections(-1);
+ }
+ endpoint.removeReference();
+ }
+ try {
+ if (theSocket != null)
+ theSocket.close();
+ }
+ catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(e.getMessage(), e);
+ }
+ }
+ }
+
+ public ByteBuffer getCommBuffer() {
+ return commBuffer;
+ }
+
+ public ServerLocation getServer() {
+ return endpoint.getLocation();
+ }
+
+ public Socket getSocket() {
+ return theSocket;
+ }
+
+ public OutputStream getOutputStream() {
+ return out;
+ }
+
+ public InputStream getInputStream() {
+ return in;
+ }
+
+
+ public ConnectionStats getStats() {
+ return endpoint.getStats();
+ }
+
+ @Override
+ public String toString() {
+ return "Connection[" + endpoint + "]@" + this.hashCode();
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ public ServerQueueStatus getQueueStatus() {
+ return status;
+ }
+
+ public Object execute(Op op) throws Exception {
+ Object result;
+ // Do not synchronize when used for GatewaySender
+ // as the same connection is being used
+ if ((op instanceof AbstractOp) && ((AbstractOp)op).isGatewaySenderOp()) {
+ result = op.attempt(this);
+ endpoint.updateLastExecute();
+ return result;
+ }
+ synchronized (this) {
+ if (op instanceof ExecuteFunctionOpImpl
+ || op instanceof ExecuteRegionFunctionOpImpl
+ || op instanceof ExecuteRegionFunctionSingleHopOpImpl) {
+ int earliertimeout = this.getSocket().getSoTimeout();
+ this.getSocket().setSoTimeout(GemFireCacheImpl.getClientFunctionTimeout());
+ result = op.attempt(this);
+ this.getSocket().setSoTimeout(earliertimeout);
+ } else {
+ result = op.attempt(this);
+ }
+ }
+ endpoint.updateLastExecute();
+ return result;
+
+ }
+
+
+ public static void loadEmergencyClasses() {
+ //do nothing
+ }
+ public short getWanSiteVersion(){
+ return wanSiteVersion;
+ }
+
+ public void setWanSiteVersion(short wanSiteVersion){
+ this.wanSiteVersion = wanSiteVersion;
+ }
+
+ public int getDistributedSystemId() {
+ return ((InternalDistributedSystem)this.ds).getDistributionManager().getDistributedSystemId();
+ }
+
+ public void setConnectionID(long id) {
+ this.connectionID = id;
+ }
+
+ public long getConnectionID() {
+ return this.connectionID;
+ }
+
+ protected HandShake getHandShake() {
+ return handShake;
+ }
+
+ protected void setHandShake(HandShake handShake) {
+ this.handShake = handShake;
+ }
+
+ /**
+ * test hook
+ */
+ public static void setTEST_DURABLE_CLIENT_CRASH(boolean v) {
+ TEST_DURABLE_CLIENT_CRASH = v;
+ }
+
+ public ByteBuffer getCommBufferForAsyncRead() {
+ return commBufferForAsyncRead;
+ }
+
+ private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
+ if (actualBufferSize < requestedBufferSize) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.Connection_SOCKET_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
+ new Object[] { new StringBuilder(type).append(" buffer size").toString(), actualBufferSize, requestedBufferSize }));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java
new file mode 100644
index 0000000..6cf5d65
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java
@@ -0,0 +1,68 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+
+
+/**
+ * A source for discovering servers and finding the least loaded
+ * server to connect to.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface ConnectionSource {
+
+ ServerLocation findServer(Set/*<ServerLocation>*/ excludedServers);
+
+ /**
+ * Asks if we should replace a connection to <code>currentServer</code>
+ * with one to the returned server.
+ * @param currentServer the server we currently have a connection to.
+ * @param excludedServers the replacement server can not be one in this set
+ * @return the server we should connect to;
+ * <code>currentServer</code> if a replacement is not needed;
+ * <code>null</code> if no server found
+ */
+ ServerLocation findReplacementServer(ServerLocation currentServer, Set/*<ServerLocation>*/ excludedServers);
+
+ /**
+ * Find the servers to host the queue
+ *
+ * @param excludedServers
+ * the servers to exclude from the search
+ * @param numServers
+ * the number of servers to find, or -1 if we should just find
+ * all of them
+ * @param proxyId
+ * the proxy id for this client
+ * @param findDurableQueue
+ * if true, the source should make an effort to find the
+ * durable queues for this client
+ * @return a list of locations to connect to
+ */
+ List/* ServerLocation */findServersForQueue(
+ Set/* <ServerLocation> */excludedServers, int numServers,
+ ClientProxyMembershipID proxyId, boolean findDurableQueue);
+
+ void start(InternalPool poolImpl);
+
+ void stop();
+
+ /**
+ * Check to see if the load on the servers is balanced, according
+ * to this connection source.
+ * @return true if the servers have balanced load.
+ */
+ boolean isBalanced();
+}