You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:19 UTC

[14/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
new file mode 100644
index 0000000..eca73fc
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionImpl.java
@@ -0,0 +1,326 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.NoRouteToHostException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.ForcedDisconnectException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.SocketUtils;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * A single client to server connection.
+ * 
+ * The execute  method of this class is synchronized to
+ * prevent two ops from using the client to server connection
+ *  at the same time.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class ConnectionImpl implements Connection {
+  
+  private static Logger logger = LogService.getLogger();
+  
+  /**Test hook to simulate a client crashing. If true, we will
+   * not notify the server when we close the connection.
+   */
+  private static boolean TEST_DURABLE_CLIENT_CRASH = false;
+  
+  private Socket theSocket;
+  private ByteBuffer commBuffer;
+  private ByteBuffer commBufferForAsyncRead;
+//  private int handShakeTimeout = AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS;
+  private ServerQueueStatus status;
+  private volatile boolean connectFinished;
+  private final AtomicBoolean destroyed = new AtomicBoolean();
+  private Endpoint endpoint;
+  private short wanSiteVersion = -1;//In Gateway communication version of connected wan site
+                                   //will be stored after successful handshake
+//  private final CancelCriterion cancelCriterion;
+  private final DistributedSystem ds;
+
+  private OutputStream out;
+  private InputStream in;
+
+  private long connectionID = Connection.DEFAULT_CONNECTION_ID;
+
+  private HandShake handShake;
+
+  public ConnectionImpl(DistributedSystem ds, CancelCriterion cancelCriterion) {
+//    this.cancelCriterion = cancelCriterion;
+    this.ds = ds;
+  }
+  
+  public ServerQueueStatus connect(EndpointManager endpointManager,
+      ServerLocation location, HandShake handShake, int socketBufferSize,
+      int handShakeTimeout, int readTimeout, byte communicationMode, GatewaySender sender)
+      throws IOException {
+    SocketCreator sc = SocketCreator.getDefaultInstance();
+    DistributionConfig config = InternalDistributedSystem.getConnectedInstance().getConfig();
+    if (communicationMode == Acceptor.GATEWAY_TO_GATEWAY) {
+      sc = SocketCreator.createNonDefaultInstance(config.getGatewaySSLEnabled(),
+          config.getGatewaySSLRequireAuthentication(), config.getGatewaySSLProtocols(),
+          config.getGatewaySSLCiphers(), config.getGatewaySSLProperties());
+      if (sender!= null && !sender.getGatewayTransportFilters().isEmpty()) {
+        sc.initializeTransportFilterClientSocketFactory(sender);
+      }
+    } else {
+      //If configured use SSL properties for cache-server
+      sc = SocketCreator.createNonDefaultInstance(config.getServerSSLEnabled(),
+          config.getServerSSLRequireAuthentication(),
+          config.getServerSSLProtocols(),
+          config.getServerSSLCiphers(),
+          config.getServerSSLProperties());
+    }
+    if (!sc
+        .isHostReachable(InetAddress.getByName(location.getHostName()))) {
+      throw new NoRouteToHostException("Server is not reachable: " + location.getHostName());
+    }
+    theSocket = sc.connectForClient(
+        location.getHostName(), location.getPort(), handShakeTimeout, socketBufferSize);
+    theSocket.setTcpNoDelay(true);
+    //System.out.println("ConnectionImpl setting buffer sizes: " +
+    // socketBufferSize);
+    theSocket.setSendBufferSize(socketBufferSize);
+
+    // Verify buffer sizes
+    verifySocketBufferSize(socketBufferSize, theSocket.getReceiveBufferSize(), "receive");
+    verifySocketBufferSize(socketBufferSize, theSocket.getSendBufferSize(), "send");
+    
+    theSocket.setSoTimeout(handShakeTimeout);
+    out = SocketUtils.getOutputStream(theSocket);//theSocket.getOutputStream();
+    in = SocketUtils.getInputStream(theSocket);//theSocket.getInputStream();
+    this.status = handShake.greet(this, location, communicationMode);
+    commBuffer = ServerConnection.allocateCommBuffer(socketBufferSize);
+    if (sender != null) {
+      commBufferForAsyncRead = ServerConnection
+          .allocateCommBuffer(socketBufferSize);
+    }
+    theSocket.setSoTimeout(readTimeout);
+    endpoint = endpointManager.referenceEndpoint(location, this.status.getMemberId());
+    //logger.warning("ESTABLISHING ENDPOINT:"+location+" MEMBERID:"+endpoint.getMemberId(),new Exception());
+    this.connectFinished = true;
+    this.endpoint.getStats().incConnections(1);
+    return status;
+  }
+  
+  public void close(boolean keepAlive) throws Exception {
+    
+    try {
+      // if a forced-disconnect has occurred, we can't send messages to anyone
+      SocketCreator sc = SocketCreator.getDefaultInstance();
+      if (!sc.isHostReachable(this.theSocket.getInetAddress())) {
+        return;
+      }
+
+      boolean sendCloseMsg = !TEST_DURABLE_CLIENT_CRASH;
+      if (sendCloseMsg) {
+        try {
+          ((InternalDistributedSystem)ds).getDistributionManager();
+        }
+        catch (CancelException e) { // distribution has stopped
+          Throwable t = e.getCause();
+          if (t instanceof ForcedDisconnectException) {
+            // we're crashing - don't attempt to send a message (bug 39317)
+            sendCloseMsg = false;
+          }
+        }
+      }
+      
+      if (sendCloseMsg) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Closing connection {} with keepAlive: {}", this, keepAlive);
+        }
+        CloseConnectionOp.execute(this, keepAlive);
+      }
+    }
+    finally {
+      destroy();
+    }
+  }
+  
+  public void emergencyClose() {
+    commBuffer = null;
+    try {
+      theSocket.close();
+    } catch (IOException e) {
+      //ignore
+    } catch (RuntimeException e) {
+      //ignore
+    }
+  }
+
+  public boolean isDestroyed() {
+    return this.destroyed.get();
+  }
+  
+  public void destroy() {
+    if (!this.destroyed.compareAndSet(false, true)) {
+      // was already set to true so someone else did the destroy
+      return;
+    }
+
+    if (endpoint != null) {
+      if (this.connectFinished) {
+        endpoint.getStats().incConnections(-1);
+      }
+      endpoint.removeReference();
+    }
+    try {
+      if (theSocket != null)
+        theSocket.close();
+    }
+    catch (Exception e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(e.getMessage(), e);
+      }
+    }
+  }
+
+  public ByteBuffer getCommBuffer() {
+    return commBuffer;
+  }
+
+  public ServerLocation getServer() {
+    return endpoint.getLocation();
+  }
+
+  public Socket getSocket() {
+    return theSocket;
+  }
+  
+  public OutputStream getOutputStream() {
+    return out;
+  }
+  
+  public InputStream getInputStream() {
+    return in;
+  }
+  
+
+  public ConnectionStats getStats() {
+    return endpoint.getStats();
+  }
+  
+  @Override
+  public String toString() {
+    return "Connection[" + endpoint + "]@" + this.hashCode();
+  }
+
+  public Endpoint getEndpoint() {
+    return endpoint;
+  }
+
+  public ServerQueueStatus getQueueStatus() {
+    return status;
+  }
+
+  public Object execute(Op op) throws Exception {
+    Object result;
+    // Do not synchronize when used for GatewaySender
+    // as the same connection is being used 
+    if ((op instanceof AbstractOp) && ((AbstractOp)op).isGatewaySenderOp()) {
+      result = op.attempt(this);
+      endpoint.updateLastExecute();
+      return result;
+    }
+    synchronized (this) {
+      if (op instanceof ExecuteFunctionOpImpl
+          || op instanceof ExecuteRegionFunctionOpImpl
+          || op instanceof ExecuteRegionFunctionSingleHopOpImpl) {
+        int earliertimeout = this.getSocket().getSoTimeout();
+        this.getSocket().setSoTimeout(GemFireCacheImpl.getClientFunctionTimeout());
+        result = op.attempt(this);
+        this.getSocket().setSoTimeout(earliertimeout);
+      } else {
+        result = op.attempt(this);
+      }
+    }
+    endpoint.updateLastExecute();
+    return result;
+
+  }
+  
+  
+  public static void loadEmergencyClasses() {
+    //do nothing
+  }
+  public short getWanSiteVersion(){
+    return wanSiteVersion;
+  }
+  
+  public void setWanSiteVersion(short wanSiteVersion){
+    this.wanSiteVersion = wanSiteVersion;
+  }
+  
+  public int getDistributedSystemId() {
+    return ((InternalDistributedSystem)this.ds).getDistributionManager().getDistributedSystemId();
+  }
+  
+  public void setConnectionID(long id) {
+    this.connectionID = id;
+  }
+
+  public long getConnectionID() {
+    return this.connectionID;
+  }
+
+  protected HandShake getHandShake() {
+    return handShake;
+  }
+
+  protected void setHandShake(HandShake handShake) {
+    this.handShake = handShake;
+  }
+
+  /**
+   * test hook
+   */
+  public static void setTEST_DURABLE_CLIENT_CRASH(boolean v) {
+    TEST_DURABLE_CLIENT_CRASH = v;
+  }
+
+  public ByteBuffer getCommBufferForAsyncRead() {
+    return commBufferForAsyncRead;
+  }
+  
+  private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
+    if (actualBufferSize < requestedBufferSize) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.Connection_SOCKET_0_IS_1_INSTEAD_OF_THE_REQUESTED_2,
+          new Object[] { new StringBuilder(type).append(" buffer size").toString(), actualBufferSize, requestedBufferSize }));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java
new file mode 100644
index 0000000..6cf5d65
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionSource.java
@@ -0,0 +1,68 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+import java.util.Set;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+
+
+/**
+ * A source for discovering servers and finding the least loaded
+ * server to connect to.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface ConnectionSource {
+  
+  ServerLocation findServer(Set/*<ServerLocation>*/ excludedServers);
+
+  /**
+   * Asks if we should replace a connection to <code>currentServer</code>
+   * with one to the returned server.
+   * @param currentServer the server we currently have a connection to.
+   * @param excludedServers the replacement server can not be one in this set
+   * @return the server we should connect to;
+   *         <code>currentServer</code> if a replacement is not needed;
+   *         <code>null</code> if no server found
+   */
+  ServerLocation findReplacementServer(ServerLocation currentServer, Set/*<ServerLocation>*/ excludedServers);
+  
+  /**
+   * Find the servers to host the queue
+   * 
+   * @param excludedServers
+   *                the servers to exclude from the search
+   * @param numServers
+   *                the number of servers to find, or -1 if we should just find
+   *                all of them
+   * @param proxyId
+   *                the proxy id for this client
+   * @param findDurableQueue
+   *                if true, the source should make an effort to find the
+   *                durable queues for this client
+   * @return a list of locations to connect to
+   */
+  List/* ServerLocation */findServersForQueue(
+      Set/* <ServerLocation> */excludedServers, int numServers,
+      ClientProxyMembershipID proxyId, boolean findDurableQueue);
+
+  void start(InternalPool poolImpl);
+
+  void stop();
+  
+  /**
+   * Check to see if the load on the servers is balanced, according
+   * to this connection source.
+   * @return true if the servers have balanced load.
+   */
+  boolean isBalanced();
+}