You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:14 UTC

[09/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java
new file mode 100644
index 0000000..f15260f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/InvalidateOp.java
@@ -0,0 +1,112 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region invalidate on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class InvalidateOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  public static final int HAS_VERSION_TAG = 0x01;
+  
+  /**
+   * Does a region invalidate on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the entry keySet on
+   */
+  public static void execute(ExecutablePool pool,
+                            String region, EntryEventImpl event)
+  {
+    AbstractOp op = new InvalidateOpImpl(region, event);
+    pool.execute(op);
+  }
+                                                               
+  private InvalidateOp() {
+    // no instances allowed
+  }
+  
+  private static class InvalidateOpImpl extends AbstractOp {
+    private EntryEventImpl event;
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public InvalidateOpImpl(String region,
+                         EntryEventImpl event) {
+      super(MessageType.INVALIDATE, event.getCallbackArgument() != null ? 4 : 3);
+      Object callbackArg = event.getCallbackArgument();
+      this.event = event;
+      getMessage().addStringPart(region);
+      getMessage().addStringOrObjPart(event.getKeyInfo().getKey());
+      getMessage().addBytesPart(event.getEventId().calcBytes());
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+    
+    
+    @Override
+    protected Object processResponse(Message msg, Connection con) throws Exception {
+       processAck(msg, "invalidate");
+       boolean isReply = (msg.getMessageType() == MessageType.REPLY);
+       int partIdx = 0;
+       int flags = 0;
+       if (isReply) {
+         flags = msg.getPart(partIdx++).getInt();
+         if ((flags & HAS_VERSION_TAG) != 0) {
+           VersionTag tag = (VersionTag)msg.getPart(partIdx++).getObject();
+           // we use the client's ID since we apparently don't track the server's ID in connections
+           tag.replaceNullIDs((InternalDistributedMember)  con.getEndpoint().getMemberId());
+           this.event.setVersionTag(tag);
+           if (logger.isDebugEnabled()) {
+             logger.debug("received Invalidate response with {}", tag);
+           }
+         } else {
+           if (logger.isDebugEnabled()) {
+             logger.debug("received Invalidate response");
+           }
+         }
+       }
+       return null;
+    }
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.INVALIDATE_ERROR;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startInvalidate();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endInvalidateSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endInvalidate(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java
new file mode 100644
index 0000000..26cc4a2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/KeySetOp.java
@@ -0,0 +1,121 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+
+/**
+ * Does a region keySet on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class KeySetOp {
+  /**
+   * Does a region entry keySet on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the entry keySet on
+   */
+  public static Set execute(ExecutablePool pool,
+                            String region)
+  {
+    AbstractOp op = new KeySetOpImpl(region);
+    return (Set)pool.execute(op);
+  }
+                                                               
+  private KeySetOp() {
+    // no instances allowed
+  }
+  
+  private static class KeySetOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public KeySetOpImpl(String region) {
+      super(MessageType.KEY_SET, 1);
+      getMessage().addStringPart(region);
+    }
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(1, Version.CURRENT);
+    }
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {
+      
+      ChunkedMessage keySetResponseMessage = (ChunkedMessage)msg;
+      final HashSet result = new HashSet();
+      final Exception[] exceptionRef = new Exception[1];
+      
+      keySetResponseMessage.readHeader();
+      final int msgType = keySetResponseMessage.getMessageType();
+      if (msgType == MessageType.RESPONSE) {
+        do {
+          keySetResponseMessage.receiveChunk();
+          //callback.handle(msg);
+          Part part = keySetResponseMessage.getPart(0);
+          Object o = part.getObject();
+          if (o instanceof Throwable) {
+            String s = "While performing a remote keySet";
+            exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+          } else {
+            result.addAll((List)o);
+          }
+        } while (!keySetResponseMessage.isLastChunk());
+      } else {
+        if (msgType == MessageType.EXCEPTION) {
+          keySetResponseMessage.receiveChunk();
+          Part part = msg.getPart(0);
+          String s = "While performing a remote " +  "keySet";
+          throw new ServerOperationException(s, (Throwable) part.getObject());
+          // Get the exception toString part.
+          // This was added for c++ thin client and not used in java
+          // Part exceptionToStringPart = msg.getPart(1);
+        } else if (isErrorResponse(msgType)) {
+          keySetResponseMessage.receiveChunk();
+          Part part = msg.getPart(0);
+          throw new ServerOperationException(part.getString());
+        } else {
+          throw new InternalGemFireError("Unexpected message type "
+                                         + MessageType.getString(msgType));
+        }
+      }
+      
+      if (exceptionRef[0] != null) {
+        throw exceptionRef[0];
+      } else {
+        return result;
+      }
+    }
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.KEY_SET_DATA_ERROR;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startKeySet();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endKeySetSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endKeySet(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java
new file mode 100644
index 0000000..1f01cf5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LiveServerPinger.java
@@ -0,0 +1,108 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.client.internal.EndpointManager.EndpointListenerAdapter;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Responsible for pinging live
+ * servers to make sure they
+ * are still alive.
+ * @author dsmith
+ *
+ */
+public class LiveServerPinger  extends EndpointListenerAdapter {
+  private static final Logger logger = LogService.getLogger();
+  
+  private static final long NANOS_PER_MS = 1000000L;
+  
+  private final ConcurrentMap/*<Endpoint,Future>*/ taskFutures = new ConcurrentHashMap();
+  protected final InternalPool pool;
+  protected final long pingIntervalNanos;
+  
+  public LiveServerPinger(InternalPool pool) {
+    this.pool = pool;
+    this.pingIntervalNanos = pool.getPingInterval() * NANOS_PER_MS;
+  }
+
+  @Override
+  public void endpointCrashed(Endpoint endpoint) {
+    cancelFuture(endpoint);
+  }
+
+  @Override
+  public void endpointNoLongerInUse(Endpoint endpoint) {
+    cancelFuture(endpoint);
+  }
+
+  @Override
+  public void endpointNowInUse(Endpoint endpoint) {
+    try {
+      Future future = pool.getBackgroundProcessor().scheduleWithFixedDelay(
+          new PingTask(endpoint), pingIntervalNanos, pingIntervalNanos,
+          TimeUnit.NANOSECONDS);
+      taskFutures.put(endpoint, future);
+    } catch (RejectedExecutionException e) {
+      if (pool.getCancelCriterion().cancelInProgress() == null) {
+        throw e;
+      }
+    }
+  }
+  
+  private void cancelFuture(Endpoint endpoint) {
+    Future future = (Future) taskFutures.remove(endpoint);
+    if(future != null) {
+      future.cancel(false);
+    }
+  }
+  
+  private class PingTask extends PoolTask {
+    private final Endpoint endpoint;
+    
+    public PingTask(Endpoint endpoint) {
+      this.endpoint = endpoint;
+    }
+
+    @Override
+    public void run2() {
+      if(endpoint.timeToPing(pingIntervalNanos)) {
+//      logger.fine("DEBUG pinging " + server);
+        try {
+          PingOp.execute(pool, endpoint.getLocation());
+        } catch(Exception e) {
+          if(logger.isDebugEnabled()) {
+            logger.debug("Error occured while pinging server: {} - {}", endpoint.getLocation(), e.getMessage());
+          }
+          GemFireCacheImpl cache = GemFireCacheImpl.getInstance();          
+          if (cache != null) {
+            ClientMetadataService cms = cache.getClientMetadataService();
+            cms.removeBucketServerLocation(endpoint.getLocation());
+          }        
+          //any failure to ping the server should be considered a crash (eg.
+          //socket timeout exception, security exception, failure to connect).
+          pool.getEndpointManager().serverCrashed(endpoint);
+        }
+      } else {
+//      logger.fine("DEBUG skipping ping of " + server
+//      + " because lastAccessed=" + endpoint.getLastAccessed());
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java
new file mode 100644
index 0000000..02ccbd9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallback.java
@@ -0,0 +1,38 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+/**
+ * A callback to receive notifications about locator discovery. Currently 
+ * only used internally.
+ * @author dsmith
+ * @since 5.7
+ */
+public interface LocatorDiscoveryCallback {
+  
+  /**
+   * Called to indicate that new locators
+   * have been discovered
+   * @param locators a list of InetSocketAddresses of new
+   * locators that have been discovered.
+   */
+  void locatorsDiscovered(List locators);
+  
+  /**
+   * Called to indicated that locators
+   * have been removed from the list
+   * of available locators.
+   * @param locators a list of InetSocketAddresses
+   * of locators that have been removed
+   */
+  void locatorsRemoved(List locators);
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java
new file mode 100644
index 0000000..b8ba3aa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/LocatorDiscoveryCallbackAdapter.java
@@ -0,0 +1,27 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+/**
+ * A locator discovery callback that does nothing.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class LocatorDiscoveryCallbackAdapter implements
+    LocatorDiscoveryCallback {
+
+  public void locatorsDiscovered(List locators) {
+  }
+
+  public void locatorsRemoved(List locators) {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
new file mode 100644
index 0000000..79d2e1f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/MakePrimaryOp.java
@@ -0,0 +1,82 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tell a server to become the primary host of a server-to-client queue
+ * @author darrel
+ * @since 5.7
+ */
+public class MakePrimaryOp {
+  /**
+   * Tell the given server to become the primary host of a server-to-client queue
+   * @param pool the pool to use to communicate with the server.
+   * @param conn the connection to do the execution on
+   * @param sentClientReady true if the client ready message has already been sent
+   */
+  public static void execute(ExecutablePool pool, Connection conn, boolean sentClientReady)
+  {
+    AbstractOp op = new MakePrimaryOpImpl(sentClientReady);
+    pool.executeOn(conn, op);
+  }
+                                                               
+  private MakePrimaryOp() {
+    // no instances allowed
+  }
+  
+  private static class MakePrimaryOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public MakePrimaryOpImpl(boolean sentClientReady) {
+      super(MessageType.MAKE_PRIMARY, 1);
+      getMessage().addBytesPart(new byte[] {(byte)(sentClientReady?0x01:0x00)});
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "makePrimary");
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startMakePrimary();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endMakePrimarySend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endMakePrimary(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java
new file mode 100644
index 0000000..403a112
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Op.java
@@ -0,0 +1,34 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+/**
+ * An operation to perform on a server. Used by
+ * {@link ExecutablePool} to attempt the operation on 
+ * multiple servers until the retryAttempts is exceeded.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface Op {
+
+  /**
+   * Attempts to execute this operation by sending its message out on the
+   * given connection, waiting for a response, and returning it.
+   * @param cnx the connection to use when attempting execution of the operation.
+   * @return the result of the operation
+   *         or <code>null</code if the operation has no result.
+   * @throws Exception if the execute failed
+   */
+  Object attempt(Connection cnx) throws Exception;
+
+  /**
+   * @return true if this Op should use a threadLocalConnection, false otherwise
+   */
+  boolean useThreadLocalConnection();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
new file mode 100644
index 0000000..967a8c3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/OpExecutorImpl.java
@@ -0,0 +1,964 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.BufferUnderflowException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.CopyException;
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.cache.CacheRuntimeException;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+import com.gemstone.gemfire.cache.TransactionException;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+import com.gemstone.gemfire.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import com.gemstone.gemfire.cache.client.internal.QueueManager.QueueConnections;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+import com.gemstone.gemfire.internal.cache.tier.BatchException;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.security.AuthenticationRequiredException;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * Called from the client and execute client to server
+ * requests against servers. Handles retrying to different servers,
+ * and marking servers dead if we get exception from them.
+ * @author dsmith
+ * @since 5.7
+ */
+public class OpExecutorImpl implements ExecutablePool {
+  private static final Logger logger = LogService.getLogger();
+  
+  private static final boolean TRY_SERVERS_ONCE = Boolean.getBoolean("gemfire.PoolImpl.TRY_SERVERS_ONCE");
+  private static final int TX_RETRY_ATTEMPT = Integer.getInteger("gemfire.txRetryAttempt", 500);
+  
+  private final ConnectionManager connectionManager;
+  private final int retryAttempts;
+  private final long serverTimeout;
+  private final boolean threadLocalConnections;
+  private final ThreadLocal<Connection> localConnection = new ThreadLocal<Connection>();
+  /**
+   * maps serverLocations to Connections when threadLocalConnections is enabled with single-hop.
+   */
+  private final ThreadLocal<Map<ServerLocation, Connection>> localConnectionMap = new ThreadLocal<Map<ServerLocation,Connection>>();
+  private final EndpointManager endpointManager;
+  private final RegisterInterestTracker riTracker;
+  private final QueueManager queueManager;
+  private final CancelCriterion cancelCriterion;
+  private /*final*/ PoolImpl pool;
+  private final ThreadLocal<Boolean> serverAffinity = new ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return Boolean.FALSE;};
+  };
+  private boolean serverAffinityFailover = false;
+  private final ThreadLocal<ServerLocation> affinityServerLocation = new ThreadLocal<ServerLocation>();
+  private final ThreadLocal<Integer> affinityRetryCount = new ThreadLocal<Integer>() {
+    protected Integer initialValue() {
+      return 0;
+    };
+  };
+  
+  public OpExecutorImpl(ConnectionManager manager, QueueManager queueManager, EndpointManager endpointManager, RegisterInterestTracker riTracker, int retryAttempts,
+      long serverTimeout, boolean threadLocalConnections, CancelCriterion cancelCriterion, PoolImpl pool)  {
+    this.connectionManager = manager;
+    this.queueManager = queueManager;
+    this.endpointManager = endpointManager;
+    this.riTracker = riTracker;
+    this.retryAttempts = retryAttempts;
+    this.serverTimeout = serverTimeout;
+    this.threadLocalConnections = threadLocalConnections;
+    this.cancelCriterion = cancelCriterion;
+    this.pool = pool;
+  }  
+  
+  public Object execute(Op op) {
+    return execute(op, retryAttempts);
+  }
+  
+  public Object execute(Op op, int retries) {
+    if (this.serverAffinity.get()) {
+      ServerLocation loc = this.affinityServerLocation.get();
+      if (loc == null) {
+        loc = getNextOpServerLocation();
+        this.affinityServerLocation.set(loc);
+        if (logger.isDebugEnabled()) {
+          logger.debug("setting server affinity to {}", this.affinityServerLocation.get());
+        }
+      }
+      return executeWithServerAffinity(loc, op);
+    }
+    boolean success = false;
+    
+    Set attemptedServers = new HashSet();
+    
+    Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
+    if (conn == null || conn.isDestroyed()) {
+      conn = connectionManager.borrowConnection(serverTimeout);
+    }
+    else if (threadLocalConnections) {
+      //Fix for 43718. Clear the thread local connection
+      //while we're performing the op. It will be reset
+      //if the op succeeds.
+      localConnection.set(null);
+      try {
+        this.connectionManager.activate(conn);
+      }
+      catch (ConnectionDestroyedException ex) {
+        conn = connectionManager.borrowConnection(serverTimeout);
+      }
+    }
+    try {
+      for(int attempt = 0; true; attempt++) {
+        // when an op is retried we may need to try to recover the previous
+        // attempt's version stamp
+        if (attempt == 1 && (op instanceof AbstractOp)) {
+          AbstractOp absOp = (AbstractOp)op;
+          absOp.getMessage().setIsRetry();
+        }
+        try {
+          authenticateIfRequired(conn, op);
+          Object result = executeWithPossibleReAuthentication(conn, op);
+          success = true;
+          return result;
+        }
+        catch (Exception e) {
+          //This method will throw an exception if we need to stop
+          //It also unsets the threadlocal connection and notifies
+          //the connection manager if there are failures.
+          handleException(e, conn, attempt, attempt >= retries && retries != -1);
+          attemptedServers.add(conn.getServer());
+          try {
+            conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
+          }
+          catch(NoAvailableServersException nse) {
+            //if retries is -1, don't try again after the last server has failed
+            if(retries == -1 || TRY_SERVERS_ONCE) {
+              handleException(e, conn, attempt, true);
+            }
+            else {
+              //try one of the failed servers again, until we exceed the retry attempts.
+              attemptedServers.clear();
+              try {
+                conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
+              }
+              catch(NoAvailableServersException nse2) {
+                handleException(e, conn, attempt, true);
+              }
+            }
+          }
+        }
+      }
+    } finally {
+      if(threadLocalConnections) {
+        this.connectionManager.passivate(conn, success);
+        //Fix for 43718. If the thread local was set to a different
+        //connection deeper in the call stack, return that connection
+        //and set our connection on the thread local.
+        Connection existingConnection = localConnection.get();
+        if(existingConnection != null && existingConnection != conn) {
+          connectionManager.returnConnection(existingConnection);
+        }
+        
+        if(!conn.isDestroyed()) {
+          localConnection.set(conn);
+        } else {
+          localConnection.set(null);
+        }
+      } else {
+        connectionManager.returnConnection(conn);
+      }
+    }
+  }
+
+  /**
+   * execute the given op on the given server. If the server cannot
+   * be reached, sends a TXFailoverOp, then retries the given op
+   * @param loc the server to execute the op on
+   * @param op the op to execute
+   * @return the result of execution
+   */
+  private Object executeWithServerAffinity(ServerLocation loc, Op op) {
+    try {
+      Object retVal = executeOnServer(loc, op, true, false);
+      affinityRetryCount.set(0);
+      return retVal;
+    } catch (ServerConnectivityException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("caught exception while executing with affinity:{}", e.getMessage(), e);
+      }
+      if (!this.serverAffinityFailover || e instanceof ServerOperationException) {
+        affinityRetryCount.set(0);
+        throw e;
+      }
+      int retryCount = affinityRetryCount.get();
+      if ((retryAttempts != -1 && retryCount >= retryAttempts) ||
+          retryCount > TX_RETRY_ATTEMPT) { // prevent stack overflow fixes bug 46535
+        affinityRetryCount.set(0);
+        throw e;
+      }
+      affinityRetryCount.set(retryCount + 1);
+    }
+    this.affinityServerLocation.set(null);
+    if (logger.isDebugEnabled()) {
+      logger.debug("reset server affinity: attempting txFailover");
+    }
+    // send TXFailoverOp, so that new server can
+    // do bootstrapping, then re-execute original op
+    AbstractOp absOp = (AbstractOp) op;
+    absOp.getMessage().setIsRetry();
+    int transactionId = absOp.getMessage().getTransactionId();
+    // for CommitOp we do not have transactionId in AbstractOp
+    // so set it explicitly for TXFailoverOp
+    try {
+      TXFailoverOp.execute(this.pool, transactionId);
+    } catch (TransactionException e) {
+      // If this is the first operation in the transaction then
+      // do not throw TransactionDataNodeHasDeparted back to the
+      // user, re-try the op instead. fixes bug 44375. NOTE: TXFailoverOp
+      // is sent even after first op, as it is not known if the first
+      // operation has established a TXState already
+      TXStateProxy txState = TXManagerImpl.getCurrentTXState();
+      if (txState == null) {
+        throw e;
+      } else if (txState.operationCount() > 1) {
+        throw e;
+      }
+    }
+    if(op instanceof ExecuteRegionFunctionOpImpl){
+      op = new ExecuteRegionFunctionOpImpl(
+          (ExecuteRegionFunctionOpImpl)op, (byte)1/*isReExecute*/, new HashSet<String>());
+      ((ExecuteRegionFunctionOpImpl)op).getMessage().setTransactionId(transactionId);
+    }else if (op instanceof ExecuteFunctionOpImpl){
+      op = new ExecuteFunctionOpImpl(
+          (ExecuteFunctionOpImpl)op, (byte)1/*isReExecute*/);
+      ((ExecuteFunctionOpImpl)op).getMessage().setTransactionId(transactionId);
+    }
+    return this.pool.execute(op);
+  }
+
+  public void setupServerAffinity(boolean allowFailover) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("setting up server affinity");
+    }
+    this.serverAffinityFailover = allowFailover;
+    this.serverAffinity.set(Boolean.TRUE);
+  }
+  
+  public void releaseServerAffinity() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("reset server affinity");
+    }
+    this.serverAffinity.set(Boolean.FALSE);
+    this.affinityServerLocation.set(null);
+  }
+  
+  public ServerLocation getServerAffinityLocation() {
+    return this.affinityServerLocation.get();
+  }
+  
+  public void setServerAffinityLocation(ServerLocation serverLocation) {
+    assert this.affinityServerLocation.get() == null;
+    this.affinityServerLocation.set(serverLocation);
+  }
+  
+  public ServerLocation getNextOpServerLocation() {
+    ServerLocation retVal = null;
+    Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
+    if (conn == null || conn.isDestroyed()) {
+      conn = connectionManager.borrowConnection(serverTimeout);
+      retVal = conn.getServer();
+    this.connectionManager.returnConnection(conn);
+    } else {
+      retVal = conn.getServer();
+    }
+    return retVal;
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.OpExecutor#executeOn(com.gemstone.gemfire.distributed.internal.ServerLocation, com.gemstone.gemfire.cache.client.internal.Op)
+   */
+  public Object executeOn(ServerLocation server, Op op) {
+    return executeOn(server, op, true,false);
+  }
+  public Object executeOn(ServerLocation p_server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+    ServerLocation server = p_server;
+    if (this.serverAffinity.get()) {
+      ServerLocation affinityserver = this.affinityServerLocation.get();
+      if (affinityserver != null) {
+        server = affinityserver;
+      } else {
+        this.affinityServerLocation.set(server);
+      }
+      // redirect to executeWithServerAffinity so that we
+      // can send a TXFailoverOp.
+      return executeWithServerAffinity(server, op);
+    }
+    return executeOnServer(server, op, accessed, onlyUseExistingCnx);
+  }
+  private Object executeOnServer(ServerLocation p_server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+    ServerLocation server = p_server;
+    boolean returnCnx = true;
+    boolean pingOp = (op instanceof PingOp.PingOpImpl);
+    Connection conn = null;
+    if (pingOp) {
+      // currently for pings we prefer to queue clientToServer cnx so that we will
+      // not create a pooled cnx when all we have is queue cnxs.
+      if (this.queueManager != null) {
+        // see if our QueueManager has a connection to this guy that we can send
+        // the ping on.
+        Endpoint ep = (Endpoint)this.endpointManager.getEndpointMap().get(server);
+        if (ep != null) {
+          QueueConnections qcs = this.queueManager.getAllConnectionsNoWait();
+          conn = qcs.getConnection(ep);
+          if (conn != null) {
+            // we found one to do the ping on
+            returnCnx = false;
+          }
+        }
+      }
+    }
+    if (conn == null) {
+      if (useThreadLocalConnection(op, pingOp)) {
+        // no need to set threadLocal to null while the op is in progress since
+        // 43718 does not impact single-hop
+        conn = getActivatedThreadLocalConnectionForSingleHop(server, onlyUseExistingCnx);
+        returnCnx = false;
+      } else {
+        conn = connectionManager.borrowConnection(server, serverTimeout,onlyUseExistingCnx);
+      }
+    }
+    boolean success = true;
+    try {
+      return executeWithPossibleReAuthentication(conn, op);
+    } catch (Exception e) {
+      success = false;
+      //This method will throw an exception if we need to stop
+      //It also unsets the threadlocal connection and notifies
+      //the connection manager if there are failures.
+      handleException(e, conn, 0, true);
+      //this shouldn't actually be reached, handle exception will throw something
+      throw new ServerConnectivityException("Received error connecting to server", e);
+    } finally {
+      if (this.serverAffinity.get() && this.affinityServerLocation.get() == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("setting server affinity to {} server:{}", conn.getEndpoint().getMemberId(), conn.getServer());
+        }
+        this.affinityServerLocation.set(conn.getServer());
+      }
+      if (useThreadLocalConnection(op, pingOp)) {
+        this.connectionManager.passivate(conn, success);
+        setThreadLocalConnectionForSingleHop(server, conn);
+      }
+      if (returnCnx) {
+        connectionManager.returnConnection(conn, accessed);
+      }
+    }
+  }
+
+  private boolean useThreadLocalConnection(Op op, boolean pingOp) {
+    return threadLocalConnections && !pingOp && op.useThreadLocalConnection();
+  }
+
+  /**
+   * gets a connection to the given serverLocation either by looking up the threadLocal {@link #localConnectionMap}.
+   * If a connection does not exist (or has been destroyed) we borrow one from connectionManager.
+   * @return the activated connection
+   */
+  private Connection getActivatedThreadLocalConnectionForSingleHop(ServerLocation server, boolean onlyUseExistingCnx) {
+    assert threadLocalConnections;
+    Connection conn = null;
+    Map<ServerLocation, Connection> connMap = this.localConnectionMap.get();
+    if (connMap != null && !connMap.isEmpty()) {
+      conn = connMap.get(server);
+    }
+    boolean borrow = true;
+    if (conn != null) {
+      try {
+        this.connectionManager.activate(conn);
+        borrow = false;
+        if (!conn.getServer().equals(server)) {
+          // poolLoadConditioningMonitor can replace the connection's
+          // endpoint from underneath us. fixes bug 45151
+          borrow = true;
+        }
+      } catch (ConnectionDestroyedException e) {
+      }
+    }
+    if (conn == null || borrow) {
+      conn = connectionManager.borrowConnection(server, serverTimeout, onlyUseExistingCnx);
+    }
+    if (borrow && connMap != null) {
+      connMap.remove(server);
+    }
+    return conn;
+  }
+  
+  /**
+   * initializes the threadLocal {@link #localConnectionMap} and adds mapping
+   * of serverLocation to Connection.
+   */
+  private void setThreadLocalConnectionForSingleHop(ServerLocation server,
+      Connection conn) {
+    assert threadLocalConnections;
+    Map<ServerLocation, Connection> connMap = this.localConnectionMap.get();
+    if (connMap == null) {
+      connMap = new HashMap<ServerLocation, Connection>();
+      this.localConnectionMap.set(connMap);
+    }
+    connMap.put(server, conn);
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ExecutablePool#executeOnPrimary(com.gemstone.gemfire.cache.client.internal.Op)
+   */
+  public Object executeOnPrimary(Op op) {
+    if(queueManager == null) {
+      throw new SubscriptionNotEnabledException();
+    }
+    
+    HashSet attemptedPrimaries = new HashSet();
+    while(true) {
+      Connection primary = queueManager.getAllConnections().getPrimary();
+      try {
+        return executeWithPossibleReAuthentication(primary, op);
+      } catch (Exception e) {
+        boolean finalAttempt = ! attemptedPrimaries.add(primary.getServer());
+        handleException(e, primary, 0, finalAttempt);
+        //we shouldn't reach this code, but just in case
+        if(finalAttempt) {
+          throw new ServerConnectivityException("Tried the same primary server twice.", e);
+        }
+      }
+    }
+  }
+  
+  public void executeOnAllQueueServers(Op op) {
+    if(queueManager == null) {
+      throw new SubscriptionNotEnabledException();
+    }
+    
+    RuntimeException lastException = null;
+    
+    QueueConnections connections = queueManager.getAllConnectionsNoWait();
+    Connection primary = connections.getPrimary();
+    if(primary != null) {
+      try {
+        executeWithPossibleReAuthentication(primary, op);
+      } catch (Exception e) {
+        try {
+          handleException(e, primary, 0, false);
+        } catch(RuntimeException e2) {
+          lastException = e2;
+        }
+      }
+    }
+
+    List backups = connections.getBackups();
+    for(int i = 0; i < backups.size(); i++) {
+      Connection conn = (Connection) backups.get(i);
+      try {
+        executeWithPossibleReAuthentication(conn, op);
+      } catch (Exception e) {
+        try {
+          handleException(e, conn, 0, false);
+        } catch(RuntimeException e2) {
+          lastException = e2;
+        }
+      }
+    }
+    
+    if (lastException != null) {
+      throw lastException;
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.ExecutablePool#executeOnAllQueueServers(com.gemstone.gemfire.cache.client.internal.Op)
+   */
+  public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
+    if(queueManager == null) {
+      throw new SubscriptionNotEnabledException();
+    }
+    QueueConnections connections = queueManager.getAllConnections();
+    
+    List backups = connections.getBackups();
+    if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+      logger.trace(LogMarker.BRIDGE_SERVER, "sending {} to backups: {}", op, backups);
+    }
+    for(int i = backups.size() - 1; i >= 0; i--) {
+      Connection conn = (Connection) backups.get(i);
+      try {
+        executeWithPossibleReAuthentication(conn, op);
+      } catch (Exception e)  {
+        handleException(e, conn, 0, false);
+      }
+    }
+
+    Connection primary = connections.getPrimary();
+    HashSet attemptedPrimaries = new HashSet();
+    while(true) {
+      try {
+        if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+          logger.trace(LogMarker.BRIDGE_SERVER, "sending {} to primary: {}", op, primary);
+        }
+        return executeWithPossibleReAuthentication(primary, op);
+      } catch (Exception e) {
+        if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
+          logger.trace(LogMarker.BRIDGE_SERVER, "caught exception sending to primary {}", e.getMessage(), e);
+        }
+        boolean finalAttempt = !attemptedPrimaries.add(primary.getServer());
+        handleException(e, primary, 0, finalAttempt);
+        primary = queueManager.getAllConnections().getPrimary();
+        //we shouldn't reach this code, but just in case
+        if(finalAttempt) {
+          throw new ServerConnectivityException("Tried the same primary server twice.", e);
+        }
+      }
+    }
+  }
+
+  public void releaseThreadLocalConnection() {
+    Connection conn = localConnection.get();
+    localConnection.set(null);
+    if(conn != null) {
+      connectionManager.returnConnection(conn);
+    }
+    Map<ServerLocation, Connection> connMap = localConnectionMap.get();
+    localConnectionMap.set(null);
+    if (connMap != null) {
+      for (Connection c : connMap.values()) {
+        connectionManager.returnConnection(c);
+      }
+    }
+  }
+
+  /**
+   * Used by GatewayBatchOp
+   */
+  public Object executeOn(Connection conn, Op op, boolean timeoutFatal) {
+    try {
+      return executeWithPossibleReAuthentication(conn, op);
+    } catch (Exception e) {
+      //This method will throw an exception if we need to stop
+      //It also unsets the threadlocal connection and notifies
+      //the connection manager if there are failures.
+      handleException(op, e, conn, 0,  true, timeoutFatal);
+      //this shouldn't actually be reached, handle exception will throw something
+      throw new ServerConnectivityException("Received error connecting to server", e);
+    } 
+  }
+  /**
+   * This is used by unit tests
+   */
+  public Object executeOn(Connection conn, Op op) {
+    return executeOn(conn, op, false);
+  }
+
+  public RegisterInterestTracker getRITracker() {
+    return riTracker;
+  }
+  
+  protected void handleException(Throwable e, 
+                                 Connection conn,
+                                 int retryCount, boolean finalAttempt) {
+    handleException(e, conn, retryCount, finalAttempt, false/*timeoutFatal*/);
+  }
+
+  protected void handleException(Op op, 
+                                 Throwable e,
+                                 Connection conn,
+                                 int retryCount,
+                                 boolean finalAttempt,
+                                 boolean timeoutFatal)
+  throws CacheRuntimeException {
+    if (op instanceof AuthenticateUserOp.AuthenticateUserOpImpl) {
+      if (e instanceof GemFireSecurityException) {
+        throw (GemFireSecurityException)e;
+      } else if (e instanceof ServerRefusedConnectionException) {
+        throw (ServerRefusedConnectionException)e;
+      }
+    }
+    handleException(e, conn, retryCount, finalAttempt, timeoutFatal);
+  }
+
+  protected void handleException(Throwable e, 
+                                 Connection conn,
+                                 int retryCount,
+                                 boolean finalAttempt,
+                                 boolean timeoutFatal)
+  throws CacheRuntimeException 
+  {
+    GemFireException exToThrow = null;
+    String title;
+    boolean invalidateServer = true;
+    boolean warn = true;
+    boolean forceThrow = false;
+    Throwable cause = e;
+    
+    cancelCriterion.checkCancelInProgress(e);
+
+    if(logger.isDebugEnabled() && !(e instanceof java.io.EOFException)) {
+      if (e instanceof java.io.EOFException){
+        logger.debug("OpExecutor.handleException on Connection to {} found EOF", conn.getServer());
+      } else if (e instanceof java.net.SocketTimeoutException) {
+        logger.debug("OpExecutor.handleException on Connection to {} read timed out", conn.getServer());
+      } else {
+        logger.debug("OpExecutor.handleException on Connection to {}", conn.getServer(),e);
+      }
+    }
+    if (e instanceof NotSerializableException) {
+      title = null; //no message
+      exToThrow = new SerializationException("Pool message failure", e);
+    }
+    else if (e instanceof BatchException || e instanceof BatchException70) {
+      title = null; //no message
+      exToThrow = new ServerOperationException(e);
+    }
+    else if (e instanceof RegionDestroyedException) {
+      invalidateServer = false;
+      title = null;
+      exToThrow =(RegionDestroyedException) e;
+    }
+    else if (e instanceof GemFireSecurityException) {
+      title = null;
+      exToThrow = new ServerOperationException(e);
+    }
+    else if (e instanceof SerializationException) {
+      title = null; // no message
+      exToThrow = new ServerOperationException(e);
+    }
+    else if (e instanceof CopyException) {
+      title = null; // no message
+      exToThrow = new ServerOperationException(e);
+    }
+    else if (e instanceof ClassNotFoundException) {
+      title = null; // no message
+      exToThrow = new ServerOperationException(e);
+    }
+    else if (e instanceof TransactionException) {
+      title = null; // no message
+      exToThrow = (TransactionException)e;
+      invalidateServer = false;
+    }
+    else if (e instanceof SynchronizationCommitConflictException) {
+      title = null;
+      exToThrow = (SynchronizationCommitConflictException)e;
+      invalidateServer = false;
+    }
+    else if (e instanceof SocketException) {
+      if ("Socket closed".equals(e.getMessage())
+          || "Connection reset".equals(e.getMessage())
+          || "Connection refused: connect".equals(e.getMessage())
+          || "Connection refused".equals(e.getMessage())) {
+        title = e.getMessage();
+      } else {
+        title = "SocketException";
+      }
+    }
+    else if (e instanceof SocketTimeoutException) {
+      invalidateServer = timeoutFatal;
+      title = "socket timed out on client";
+      cause = null;
+    }
+    else if (e instanceof ConnectionDestroyedException) {
+      invalidateServer = false;
+      title = "connection was asynchronously destroyed";
+      cause = null;
+    }
+    else if (e instanceof java.io.EOFException) {
+      /*
+//      it is still listening so make this into a timeout exception
+        invalidateServer = false;
+        title = "socket closed on server";
+        SocketTimeoutException ste = new SocketTimeoutException(title);
+        ste.setStackTrace(e.getStackTrace());
+        e = ste;
+        cause = null;
+        */ 
+      
+      /*
+       * note: the old code in ConnectionProxyImpl used to create a new socket here to the server to determine if it really crashed.
+       * We may have to add this back in for some reason, but hopefully not.
+       * 
+       * note 05/21/08: an attempt to address this was made by increasing the time waited on server before closing timeoutd clients
+       * see ServerConnection.hasBeenTimedOutOnClient
+       */
+      title = "closed socket on server";
+    }
+    else if (e instanceof IOException) {
+      title = "IOException";
+    }
+    else if (e instanceof BufferUnderflowException) {
+      title = "buffer underflow reading from server";
+    }
+    else if (e instanceof CancelException) {
+      title = "Cancelled";
+      warn = false;
+    }
+    else if (e instanceof InternalFunctionInvocationTargetException) {  
+      //In this case, function will be re executed
+      title = null;
+      exToThrow = (InternalFunctionInvocationTargetException)e;
+    }
+    else if (e instanceof FunctionInvocationTargetException) {  
+      //in this case function will not be re executed
+      title = null; 
+      exToThrow = (GemFireException)e;
+    }
+    else if (e instanceof PutAllPartialResultException) {
+      title = null;
+      exToThrow =(PutAllPartialResultException) e;
+      invalidateServer = false;
+    }
+    else {
+      Throwable t = e.getCause();
+      if ((t instanceof ConnectException)
+          || (t instanceof SocketException)
+          || (t instanceof SocketTimeoutException)
+          || (t instanceof IOException)
+          || (t instanceof SerializationException)
+          || (t instanceof CopyException)
+          || (t instanceof GemFireSecurityException)
+          || (t instanceof ServerOperationException)
+          || (t instanceof TransactionException)
+          || (t instanceof CancelException)) {
+        handleException(t,  conn, retryCount, finalAttempt, timeoutFatal);
+        return;
+      } else if (e instanceof ServerOperationException) {
+          title = null; // no message
+          exToThrow = (ServerOperationException)e;
+          invalidateServer = false; // fix for bug #42225
+      }
+      else if (e instanceof FunctionException) {
+        if (t instanceof InternalFunctionInvocationTargetException) {
+          // Client server to re-execute for node failure
+          handleException(t, conn, retryCount, finalAttempt, timeoutFatal);
+          return;
+        }
+        else {
+          title = null; // no message
+          exToThrow = (FunctionException)e;
+        }
+      } else if (e instanceof ServerConnectivityException
+          && e.getMessage()
+              .equals("Connection error while authenticating user")) {
+        title = null;
+        if (logger.isDebugEnabled()) {
+          logger.debug(e.getMessage(), e);
+        }
+      } else {
+        title = e.toString();
+        forceThrow = true;
+      }
+    }
+    if (title != null) {
+      conn.destroy();
+      if(invalidateServer) {
+        endpointManager.serverCrashed(conn.getEndpoint());
+      }
+      boolean logEnabled = warn ? logger.isWarnEnabled() : logger.isDebugEnabled();
+      boolean msgNeeded = logEnabled || finalAttempt;
+      if (msgNeeded) {
+        final StringBuffer sb = getExceptionMessage(title, retryCount, finalAttempt, conn, e);
+        final String msg = sb.toString();
+        if (logEnabled) {
+          if (warn) {
+            logger.warn(msg /*, e*/);
+          } else {
+            logger.debug(msg /*, e*/);
+          }
+        }
+        if (forceThrow || finalAttempt) {
+          exToThrow = new ServerConnectivityException(msg, cause);
+        }
+      }
+    }
+    if (exToThrow != null) {
+      throw exToThrow;
+    }
+  }
+  
+  private StringBuffer getExceptionMessage(String exceptionName, 
+      int retryCount,
+      boolean finalAttempt,
+      Connection connection,
+      Throwable ex) {
+    StringBuffer message = new StringBuffer(200);
+    message
+    .append("Pool unexpected ")
+    .append(exceptionName);
+    if (connection != null) {
+      message
+      .append(" connection=")
+      .append(connection);
+    }
+    if (retryCount > 0) {
+      message
+      .append(" attempt=")
+      .append(retryCount+1);
+    }
+    message.append(')');
+    if (finalAttempt) {
+      message
+      .append(". Server unreachable: could not connect after ")
+      .append(retryCount+1)
+      .append(" attempts");
+    }
+    return message;
+  }
+
+  public Connection getThreadLocalConnection() {
+    return localConnection.get();
+  }
+
+  public void setThreadLocalConnection(Connection conn) {
+    localConnection.set(conn);
+  }
+
+  private void authenticateIfRequired(Connection conn, Op op) {
+    if (!conn.getServer().getRequiresCredentials()) {
+      return;
+    }
+    
+    if (this.pool == null) {
+      PoolImpl poolImpl = (PoolImpl)PoolManagerImpl.getPMI().find(
+          this.endpointManager.getPoolName());
+      if (poolImpl == null) {
+        return;
+      }
+      this.pool = poolImpl;
+    }
+    if (this.pool.getMultiuserAuthentication()) {
+      if (((AbstractOp)op).needsUserId()) {
+        UserAttributes ua = UserAttributes.userAttributes.get();
+        if (ua != null) {
+          if (!ua.getServerToId().containsKey(conn.getServer())) {
+            authenticateMultiuser(this.pool, conn, ua);
+          }
+        } else {
+          // This should never be reached.
+        }
+      }
+    } else if (((AbstractOp)op).needsUserId()) {
+      // This should not be reached, but keeping this code here in case it is
+      // reached.
+      if (conn.getServer().getUserId() == -1) {
+        Connection connImpl = this.connectionManager.getConnection(conn);
+        conn.getServer().setUserId(
+            (Long)AuthenticateUserOp.executeOn(connImpl, this.pool));
+        if (logger.isDebugEnabled()) {
+          logger.debug("OpExecutorImpl.execute() - single user mode - authenticated this user on {}", conn);
+        }
+      }
+    }
+  }
+
+  private void authenticateMultiuser(PoolImpl pool, Connection conn,
+      UserAttributes ua) {
+    try {
+      Long userId = (Long)AuthenticateUserOp.executeOn(conn.getServer(),
+          pool, ua.getCredentials());
+      if (userId != null) {
+        ua.setServerToId(conn.getServer(), userId);
+        if (logger.isDebugEnabled()) {
+          logger.debug("OpExecutorImpl.execute() - multiuser mode - authenticated this user on {}", conn);
+        }
+      }
+    } catch (ServerConnectivityException sce) {
+      Throwable cause = sce.getCause();
+      if (cause instanceof SocketException
+          || cause instanceof EOFException
+          || cause instanceof IOException
+          || cause instanceof BufferUnderflowException
+          || cause instanceof CancelException
+          || (sce.getMessage() != null && (sce.getMessage().indexOf(
+              "Could not create a new connection to server") != -1
+              || sce.getMessage().indexOf("socket timed out on client") != -1 || sce
+              .getMessage().indexOf(
+                  "connection was asynchronously destroyed") != -1))) {
+        throw new ServerConnectivityException(
+            "Connection error while authenticating user");
+      } else {
+        throw sce;
+      }
+    }
+  }
+
+  private Object executeWithPossibleReAuthentication(Connection conn, Op op)
+      throws Exception {
+    try {
+      return conn.execute(op);
+
+    } catch (ServerConnectivityException sce) {
+      Throwable cause = sce.getCause();
+      if ((cause instanceof AuthenticationRequiredException
+          && "User authorization attributes not found.".equals(cause
+              .getMessage())) 
+          || sce.getMessage().contains(
+              "Connection error while authenticating user")) {
+        // (ashetkar) Need a cleaner way of doing above check.
+        // 2nd exception-message above is from AbstractOp.sendMessage()
+
+        PoolImpl pool = (PoolImpl)PoolManagerImpl.getPMI().find(
+            this.endpointManager.getPoolName());
+        if (!pool.getMultiuserAuthentication()) {
+          Connection connImpl = this.connectionManager.getConnection(conn);
+          conn.getServer().setUserId(
+              (Long)AuthenticateUserOp.executeOn(connImpl, this));
+          return conn.execute(op);
+        } else {
+          UserAttributes ua = UserAttributes.userAttributes.get();
+          if (ua != null) {
+            authenticateMultiuser(pool, conn, ua);
+          }
+          return conn.execute(op);
+        }
+
+      } else {
+        throw sce;
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java
new file mode 100644
index 0000000..c70e312
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PdxRegistryRecoveryListener.java
@@ -0,0 +1,81 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.pdx.internal.TypeRegistry;
+
+/**
+ * A listener which will wipe out the PDX registry on the client side if the
+ * entire server distributed system was lost and came back on line. <br>
+ * <br>
+ * TODO - There is a window in which all of the servers could crash and come
+ * back up and we would connect to a new server before realizing that all the
+ * servers crashed. To fix this, we would need to get some kind of birthdate of
+ * the server ds we connect and use that to decide if we need to recover
+ * the PDX registry.
+ * 
+ * We can also lose connectivity with the servers, even if the servers are still
+ * running. Maybe for the PDX registry we need some way of telling if the PDX
+ * registry was lost at the server side in the interval. 
+ * 
+ * 
+ * @author dsmith
+ * 
+ */
+public class PdxRegistryRecoveryListener extends EndpointManager.EndpointListenerAdapter {
+  private static final Logger logger = LogService.getLogger();
+  
+  private final AtomicInteger endpointCount = new AtomicInteger();
+  private final InternalPool pool;
+  
+  public PdxRegistryRecoveryListener(InternalPool pool) {
+    this.pool = pool;
+  }
+  
+  @Override
+  public void endpointCrashed(Endpoint endpoint) {
+    int count = endpointCount.decrementAndGet();
+    if (logger.isDebugEnabled()) {
+      logger.debug("PdxRegistryRecoveryListener - EndpointCrashed. Now have {} endpoints", count);
+    }
+  }
+
+  @Override
+  public void endpointNoLongerInUse(Endpoint endpoint) {
+    int count = endpointCount.decrementAndGet();
+    if (logger.isDebugEnabled()) {
+      logger.debug("PdxRegistryRecoveryListener - EndpointNoLongerInUse. Now have {} endpoints", count);
+    }
+  }
+
+  @Override
+  public void endpointNowInUse(Endpoint endpoint) {
+    int count  = endpointCount.incrementAndGet();
+    if (logger.isDebugEnabled()) {
+      logger.debug("PdxRegistryRecoveryListener - EndpointNowInUse. Now have {} endpoints", count);
+    }
+    if(count == 1) {
+      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+      if(cache == null) {
+        return;
+      }
+      TypeRegistry registry = cache.getPdxRegistry();
+      
+      if(registry == null) {
+        return;
+      }
+      registry.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
new file mode 100644
index 0000000..ac32e39
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PingOp.java
@@ -0,0 +1,104 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Ping a server to see if it is still alive.
+ * @author darrel
+ * @since 5.7
+ */
+public class PingOp {
+  /**
+   * Ping the specified server to see if it is still alive
+   * @param pool the pool to use to communicate with the server.
+   * @param server the server to do the execution on
+   */
+  public static void execute(ExecutablePool pool, ServerLocation server)
+  {
+    AbstractOp op = new PingOpImpl();
+    pool.executeOn(server, op, false,false);
+  }
+                                                               
+  private PingOp() {
+    // no instances allowed
+  }
+
+  static class PingOpImpl extends AbstractOp {
+    
+    private long startTime;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public PingOpImpl() {
+      super(MessageType.PING, 0);
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+      Message.messageType.set(null);
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      startTime = System.currentTimeMillis();
+      getMessage().send(false);
+      Message.messageType.set(MessageType.PING);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "ping");
+      final int msgType = msg.getMessageType();
+      if (msgType == MessageType.REPLY  &&  msg.getNumberOfParts() > 1) {
+        long endTime = System.currentTimeMillis();
+        long serverTime = msg.getPart(1).getLong();
+        // the new clock offset is computed assuming that the server's timestamp was
+        // taken mid-way between when the ping was sent and the reply was
+        // received:
+        //    timestampElapsedTime = (endTime - startTime)/2
+        //    localTime = startTime + timestampElapsedTime
+        //    offsetFromServer = serverTime - localTime
+        long newCacheTimeOffset = serverTime - startTime/2 - endTime/2;
+        InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance();
+        if (ds != null && ds.isLoner()) { // check for loner so we don't jump time offsets across WAN connections
+          ds.getClock().setCacheTimeOffset(null, newCacheTimeOffset, false);
+        }
+      }
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startPing();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endPingSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endPing(start, hasTimedOut(), hasFailed());
+    }
+  }
+}