You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:17 UTC

[12/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java
new file mode 100644
index 0000000..b3fdbc0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ContainsKeyOp.java
@@ -0,0 +1,93 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a region containsKey on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class ContainsKeyOp {
+  /**
+   * Does a region entry containsKey on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the entry containsKey on
+   * @param key the entry key to do the containsKey on
+   * @return the result of invoking containsKey on the server
+   */
+  public static boolean execute(ExecutablePool pool,
+                                String region,
+                                Object key,
+                                MODE mode)
+  {
+    AbstractOp op = new ContainsKeyOpImpl(region, key, mode);
+    Boolean result = (Boolean)pool.execute(op);
+    return result.booleanValue();
+  }
+                                                               
+  private ContainsKeyOp() {
+    // no instances allowed
+  }
+  
+  private static class ContainsKeyOpImpl extends AbstractOp {
+    
+    private String region;
+    private Object key;
+    private final MODE mode;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public ContainsKeyOpImpl(String region,
+                         Object key,
+                         MODE mode) {
+      super(MessageType.CONTAINS_KEY, 3);
+      getMessage().addStringPart(region);
+      getMessage().addStringOrObjPart(key);
+      getMessage().addIntPart(mode.ordinal());
+      this.region = region;
+      this.key = key;
+      this.mode = mode;
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      return processObjResponse(msg, "containsKey");
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.CONTAINS_KEY_DATA_ERROR;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startContainsKey();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endContainsKeySend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endContainsKey(start, hasTimedOut(), hasFailed());
+    }
+    
+    @Override
+    public String toString() {
+      return "ContainsKeyOp(region=" + region + ";key=" + key+";mode="+mode;
+    }
+  }
+  
+  public enum MODE {
+    KEY,
+    VALUE_FOR_KEY,
+    VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
new file mode 100644
index 0000000..66cf256
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DataSerializerRecoveryListener.java
@@ -0,0 +1,144 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class DataSerializerRecoveryListener extends EndpointManager.EndpointListenerAdapter {
+  private static final Logger logger = LogService.getLogger();
+  
+  private final AtomicInteger endpointCount = new AtomicInteger();
+  protected final InternalPool pool;
+  protected final ScheduledExecutorService background;
+  protected final long pingInterval;
+  protected final Object recoveryScheduledLock = new Object();
+  protected boolean recoveryScheduled;
+  
+  public DataSerializerRecoveryListener(ScheduledExecutorService background, InternalPool pool) {
+    this.pool = pool;
+    this.pingInterval = pool.getPingInterval();
+    this.background = background;
+  }
+  
+  @Override
+  public void endpointCrashed(Endpoint endpoint) {
+    int count = endpointCount.decrementAndGet();
+    if(logger.isDebugEnabled()) {
+      logger.debug("DataSerializerRecoveryTask - EndpointCrashed. Now have {} endpoints", count);
+    }
+  }
+
+  @Override
+  public void endpointNoLongerInUse(Endpoint endpoint) {
+    int count = endpointCount.decrementAndGet();
+    if(logger.isDebugEnabled()) {
+      logger.debug("DataSerializerRecoveryTask - EndpointNoLongerInUse. Now have {} endpoints", count);
+    }
+  }
+
+  @Override
+  public void endpointNowInUse(Endpoint endpoint) {
+    int count  = endpointCount.incrementAndGet();
+    if(logger.isDebugEnabled()) {
+      logger.debug("DataSerializerRecoveryTask - EndpointNowInUse. Now have {} endpoints", count);
+    }
+    if(count == 1) {
+      synchronized(recoveryScheduledLock) {
+        if(!recoveryScheduled) {
+          try {
+            recoveryScheduled = true;
+            background.execute(new RecoveryTask());
+            logger.debug("DataSerializerRecoveryTask - Scheduled Recovery Task");
+          } catch(RejectedExecutionException e) {
+            //ignore, the timer has been cancelled, which means we're shutting down.
+          }
+        }
+      }
+    }
+  }
+  
+  protected class RecoveryTask extends PoolTask {
+
+    @Override
+    public void run2() {
+      if(pool.getCancelCriterion().cancelInProgress() != null) {
+        return;
+      }
+      synchronized(recoveryScheduledLock) {
+        recoveryScheduled = false;
+      }
+      logger.debug("DataSerializerRecoveryTask - Attempting to recover dataSerializers");
+      SerializerAttributesHolder[] holders= InternalDataSerializer.getSerializersForDistribution();
+      if(holders.length == 0) {
+        return;
+      }
+      EventID eventId = InternalDataSerializer.generateEventId();
+      //Fix for bug:40930
+      if (eventId == null) {
+        try {
+          background.schedule(new RecoveryTask(), pingInterval,
+              TimeUnit.MILLISECONDS);
+          recoveryScheduled = true;
+        } catch (RejectedExecutionException e) {
+          pool.getCancelCriterion().checkCancelInProgress(e);
+          throw e;
+        }
+      }
+      else {
+        try {
+          RegisterDataSerializersOp.execute(pool, holders, eventId);
+        } 
+        catch (CancelException e) {
+          throw e;
+        }
+        catch (RejectedExecutionException e) {
+          // This is probably because we've started to shut down.
+          pool.getCancelCriterion().checkCancelInProgress(e);
+          throw e; // weird
+        }
+        catch(Exception e) {
+          pool.getCancelCriterion().checkCancelInProgress(e);
+          
+          // If ClassNotFoundException occurred on server, don't retry
+          Throwable cause = e.getCause();
+          boolean cnfException = false;
+          if (cause instanceof ClassNotFoundException) {
+            logger.warn(LocalizedMessage.create(
+                LocalizedStrings.DataSerializerRecoveryListener_ERROR_CLASSNOTFOUNDEXCEPTION,
+                cause.getMessage()));
+            cnfException = true;
+          }
+          
+          if(!recoveryScheduled && !cnfException) {
+            logger.warn(LocalizedMessage.create(
+              LocalizedStrings.DataSerializerRecoveryListener_ERROR_RECOVERING_DATASERIALIZERS),
+              e);
+            background.schedule(new RecoveryTask(), pingInterval, TimeUnit.MILLISECONDS);
+            recoveryScheduled = true;
+          }
+        } finally {
+          pool.releaseThreadLocalConnection();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
new file mode 100644
index 0000000..7dc0eaa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
@@ -0,0 +1,282 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.client.AllConnectionsInUseException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.util.BridgeWriterException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region destroy on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class DestroyOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  public static final int HAS_VERSION_TAG = 0x01;
+  
+  public static final int HAS_ENTRY_NOT_FOUND_PART = 0x02;
+
+  /**
+   * Does a region entry destroy on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the region to do the entry destroy on
+   * @param key the entry key to do the destroy on
+   * @param event the event for this destroy operation
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public static Object execute(ExecutablePool pool, LocalRegion region,
+      Object key,
+      Object expectedOldValue, Operation operation,
+      EntryEventImpl event, Object callbackArg,
+      boolean prSingleHopEnabled) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Preparing DestroyOp for {} operation={}", key, operation);
+    }
+    AbstractOp op = new DestroyOpImpl(region, key, expectedOldValue,
+        operation, event, callbackArg, prSingleHopEnabled);
+    if (prSingleHopEnabled) {
+      ClientMetadataService cms = region.getCache()
+          .getClientMetadataService();
+      ServerLocation server = cms.getBucketServerLocation(region,
+          Operation.DESTROY, key, null, callbackArg);
+      if (server != null) {
+        try {
+          PoolImpl poolImpl = (PoolImpl)pool;
+          boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
+              .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
+              : false);
+          return pool.executeOn(server, op, true, onlyUseExistingCnx);
+        }
+        catch (AllConnectionsInUseException e) {
+        }
+        catch (ServerConnectivityException e) {
+          if (e instanceof ServerOperationException) {
+            throw e; // fixed 44656
+          }
+          cms.removeBucketServerLocation(server);
+        }
+        catch (BridgeWriterException e) {
+          if (e.getCause() instanceof ServerConnectivityException)
+            cms.removeBucketServerLocation(server);
+        }
+      }
+    }
+    return pool.execute(op);
+  }
+  
+  /**
+   * Does a region entry destroy on a server using the given connection to
+   * communicate with the server.
+   * 
+   * @param con
+   *                the connection to use to send to the server
+   * @param pool
+   *                the pool to use to communicate with the server.
+   * @param region
+   *                the region to do the entry destroy on
+   * @param key
+   *                the entry key to do the destroy on
+   * @param event
+   *                the event for this destroy operation
+   * @param callbackArg
+   *                an optional callback arg to pass to any cache callbacks
+   */
+  public static void execute(Connection con,
+                             ExecutablePool pool,
+                             String region,
+                             Object key,
+                             Object expectedOldValue,
+                             Operation operation,
+                             EntryEventImpl event,
+                             Object callbackArg)
+  {
+    AbstractOp op = new DestroyOpImpl(region, key, expectedOldValue,
+        operation, event, callbackArg);
+    pool.executeOn(con, op);
+  }
+
+  /** this is set if a response is received indicating that the entry was not found on the server */
+  public static boolean TEST_HOOK_ENTRY_NOT_FOUND;
+                                                               
+  private DestroyOp() {
+    // no instances allowed
+  }
+  
+  private static class DestroyOpImpl extends AbstractOp {
+    
+    Object key = null;
+    
+    private LocalRegion region;       
+
+    private Operation operation;
+
+    private boolean prSingleHopEnabled = false;
+    
+    private Object callbackArg;
+    
+    private EntryEventImpl event;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public DestroyOpImpl(LocalRegion region,
+                         Object key,
+                         Object expectedOldValue,
+                         Operation operation,
+                         EntryEventImpl event,
+                         Object callbackArg,
+                         boolean prSingleHopEnabled) {
+      super(MessageType.DESTROY, callbackArg != null ? 6 : 5);
+      this.key = key;
+      this.region = region ;
+      this.operation = operation;
+      this.prSingleHopEnabled = prSingleHopEnabled;
+      this.callbackArg = callbackArg ;
+      this.event = event;
+      getMessage().addStringPart(region.getFullPath());
+      getMessage().addStringOrObjPart(key);
+      getMessage().addObjPart(expectedOldValue);
+      getMessage().addObjPart(operation==Operation.DESTROY? null : operation); // server interprets null as DESTROY
+      getMessage().addBytesPart(event.getEventId().calcBytes());
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+
+
+    public DestroyOpImpl(String region, Object key, Object expectedOldValue,
+        Operation operation, EntryEventImpl event, 
+        Object callbackArg) {
+      super(MessageType.DESTROY, callbackArg != null ? 6 : 5);
+      this.key = key;
+      this.event = event;
+      getMessage().addStringPart(region);
+      getMessage().addStringOrObjPart(key);
+      getMessage().addObjPart(expectedOldValue);
+      getMessage().addObjPart(operation==Operation.DESTROY? null : operation); // server interprets null as DESTROY
+      getMessage().addBytesPart(event.getEventId().calcBytes());
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+    
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+    
+    
+    @Override
+    protected Object processResponse(Message msg, Connection con) throws Exception {
+      processAck(msg, "destroy");
+      boolean isReply = (msg.getMessageType() == MessageType.REPLY);
+      int partIdx = 0;
+      int flags = 0;
+      if (isReply) {
+        flags = msg.getPart(partIdx++).getInt();
+        if ((flags & HAS_VERSION_TAG) != 0) {
+          VersionTag tag = (VersionTag)msg.getPart(partIdx++).getObject();
+          // we use the client's ID since we apparently don't track the server's ID in connections
+          tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
+          this.event.setVersionTag(tag);
+          if (logger.isDebugEnabled()) {
+            logger.debug("received Destroy response with {}", tag);
+            }
+        } else if (logger.isDebugEnabled()) {
+          logger.debug("received Destroy response with no version tag");
+        }
+      }
+      if (prSingleHopEnabled) {
+        byte version = 0 ;
+//        if (log.fineEnabled()) {
+//          log.fine("reading prSingleHop part #" + (partIdx+1));
+//        }
+        Part part = msg.getPart(partIdx++);
+        byte[] bytesReceived = part.getSerializedForm();
+        if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
+            && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
+          if (this.region != null) {
+            ClientMetadataService cms = null;
+            try {
+              cms = region.getCache().getClientMetadataService();
+              version = cms.getMetaDataVersion(region, Operation.UPDATE,
+                  key, null, callbackArg);
+            }
+            catch (CacheClosedException e) {
+              return null;
+            }
+            if (bytesReceived[0] != version) {
+              cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+            }
+          }
+        }
+      } else {
+        partIdx++; // skip OK byte
+      }
+      boolean entryNotFound = false;
+      if (msg.getMessageType() == MessageType.REPLY && (flags & HAS_ENTRY_NOT_FOUND_PART) != 0) {
+        entryNotFound = (msg.getPart(partIdx++).getInt() == 1);
+        if (logger.isDebugEnabled() && (flags & HAS_ENTRY_NOT_FOUND_PART) != 0) {
+          logger.debug("destroy response has entryNotFound={}", entryNotFound);
+        }
+        if (entryNotFound) {
+          TEST_HOOK_ENTRY_NOT_FOUND = true;
+        }
+      }
+      if (this.operation == Operation.REMOVE && entryNotFound) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("received REMOVE response from server with entryNotFound={}", entryNotFound);
+        }
+        return new EntryNotFoundException(LocalizedStrings.AbstractRegionMap_ENTRY_NOT_FOUND_WITH_EXPECTED_VALUE.toLocalizedString());
+      }
+      return null;
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.DESTROY_DATA_ERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startDestroy();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endDestroySend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endDestroy(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    public String toString() {
+      return "DestroyOp:"+key;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java
new file mode 100644
index 0000000..f1e3cd9
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyRegionOp.java
@@ -0,0 +1,95 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.EventID;
+
+/**
+ * Does a region destroyRegion (or create) on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class DestroyRegionOp {
+  /**
+   * Does a region destroyRegion on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the destroyRegion on
+   * @param eventId the event id for this destroyRegion
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public static void execute(ExecutablePool pool,
+                             String region,
+                             EventID eventId,
+                             Object callbackArg)
+  {
+    AbstractOp op = new DestroyRegionOpImpl(region, eventId, callbackArg);
+    pool.execute(op);
+  }
+  /**
+   * Does a region destroyRegion on a server using the given connection
+   * to communicate with the server.
+   * @param con the connection to use to send to the server
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the destroyRegion on
+   * @param eventId the event id for this destroyRegion
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public static void execute(Connection con,
+                             ExecutablePool pool,
+                             String region,
+                             EventID eventId,
+                             Object callbackArg)
+  {
+    AbstractOp op = new DestroyRegionOpImpl(region, eventId, callbackArg);
+    pool.executeOn(con, op);
+  }
+                                                               
+  private DestroyRegionOp() {
+    // no instances allowed
+  }
+  
+  private static class DestroyRegionOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public DestroyRegionOpImpl(String region,
+                     EventID eventId,
+                     Object callbackArg) {
+      super(MessageType.DESTROY_REGION, callbackArg != null ? 3 : 2);
+      getMessage().addStringPart(region);
+      getMessage().addBytesPart(eventId.calcBytes());
+      if (callbackArg != null) {
+        getMessage().addObjPart(callbackArg);
+      }
+    }
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "destroyRegion");
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.DESTROY_REGION_DATA_ERROR;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startDestroyRegion();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endDestroyRegionSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endDestroyRegion(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java
new file mode 100644
index 0000000..25518a3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Endpoint.java
@@ -0,0 +1,102 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Represents a server. Keeps track of information about the specific server
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class Endpoint {
+  
+  private AtomicLong lastExecute = new AtomicLong();
+  private AtomicInteger references = new AtomicInteger();
+  private final ServerLocation location;
+  private final ConnectionStats stats;
+  private final EndpointManagerImpl manager;
+  private final DistributedMember memberId;
+  private volatile boolean closed;
+  
+  Endpoint(EndpointManagerImpl endpointManager, DistributedSystem ds,
+      ServerLocation location, ConnectionStats stats,
+      DistributedMember memberId) {
+    this.manager =endpointManager;
+    this.location = location;
+    this.stats = stats;
+    this.memberId = memberId;
+    updateLastExecute();
+  }
+
+  public void updateLastExecute() {
+    this.lastExecute.set(System.nanoTime());
+  }
+
+  private long getLastExecute() {
+    return lastExecute.get();
+  }
+
+  public boolean timeToPing(long pingIntervalNanos) {
+    long now = System.nanoTime();
+    return getLastExecute() <= (now - pingIntervalNanos);
+  }
+  
+  public void close() {
+    if(!closed) {
+      closed = true;
+    }
+  }
+  
+  public boolean isClosed() {
+    return closed == true;
+  }
+
+  public ConnectionStats getStats() {
+    return stats;
+  }
+
+  void addReference() {
+    references.incrementAndGet();
+  }
+
+  /**
+   * @return true if this was the last reference to the endpoint
+   */
+  public boolean removeReference() {
+    boolean lastReference = (references.decrementAndGet() <= 0);
+    if(lastReference) {
+      manager.endpointNotInUse(this);
+    }
+    return lastReference;
+  }
+
+  /**
+   * @return the location of this server
+   */
+  public ServerLocation getLocation() {
+    return location;
+  }
+
+  @Override
+  public String toString() {
+    return location.toString();
+  }
+  
+  public DistributedMember getMemberId() {
+    return memberId;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java
new file mode 100644
index 0000000..5d4a1f2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManager.java
@@ -0,0 +1,91 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * The endpoint manager keeps track of which servers we 
+ * are connected to. Other parts of the client code can register
+ * listeners that will be notified about when endpoints are created
+ * or died. For example the connection manager registers a listener
+ * to be notified if a server dies and closes all of it's connections.
+ * @author dsmith
+ *
+ */
+public interface EndpointManager {
+  /**
+   * Get the endpoint for this server and create it if it does not already exist.
+   * This increments the reference count for the endpoint,
+   *  so you should call {@link Endpoint#removeReference()} after
+   *  the endpoint is no longer in use.
+   */
+  Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId);
+
+  /**
+   * Indicate that a particular server has crashed. All of the listeners will be notified
+   * that the server has crashed.
+   * @param endpoint
+   */
+  void serverCrashed(Endpoint endpoint);
+
+  /**
+   * Get the map of all endpoints currently in use.
+   * @return a map for ServerLocation->Endpoint
+   */
+  Map<ServerLocation, Endpoint> getEndpointMap();
+
+  void close();
+
+  /** Add a listener which will be notified when the state of
+   * an endpoint changes.
+   */
+  void addListener(EndpointManager.EndpointListener listener);
+
+  /**
+   * Remove a listener. 
+   */
+  void removeListener(EndpointManager.EndpointListener listener);
+  
+  /**
+   * Get the stats for all of the servers we ever connected too.
+   * @return a map of ServerLocation-> ConnectionStats
+   */
+  public Map getAllStats();
+
+  /**
+   * Test hook that returns the number of servers we currently have connections to.
+   */
+  public int getConnectedServerCount();
+
+  public static interface EndpointListener {
+    
+    void endpointNoLongerInUse(Endpoint endpoint);
+    
+    void endpointCrashed(Endpoint endpoint);
+    
+    void endpointNowInUse(Endpoint endpoint);
+  }
+
+  public static class EndpointListenerAdapter implements EndpointListener {
+  
+    public void endpointCrashed(Endpoint endpoint) {
+    }
+  
+    public void endpointNoLongerInUse(Endpoint endpoint) {
+    }
+  
+    public void endpointNowInUse(Endpoint endpoint) {
+    }
+  }
+
+  public String getPoolName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java
new file mode 100644
index 0000000..5bf3a48
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/EndpointManagerImpl.java
@@ -0,0 +1,301 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.execute.TransactionFunctionService;
+import com.gemstone.gemfire.internal.cache.tier.InternalBridgeMembership;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * @author dsmith
+ *
+ */
+public class EndpointManagerImpl implements EndpointManager {
+  private static final Logger logger = LogService.getLogger();
+  
+  private volatile Map<ServerLocation, Endpoint> endpointMap = Collections.emptyMap();
+  private final Map/*<ServerLocation, ConnectionStats>*/<ServerLocation, ConnectionStats> statMap = new HashMap<ServerLocation, ConnectionStats>();
+  private final DistributedSystem ds;
+  private final String poolName;
+  private final EndpointListenerBroadcaster listener = new EndpointListenerBroadcaster();
+  protected final CancelCriterion cancelCriterion;
+  private final PoolStats poolStats;
+  
+  public EndpointManagerImpl(String poolName, DistributedSystem ds,CancelCriterion cancelCriterion, PoolStats poolStats) {
+    this.ds = ds;
+    this.poolName = poolName;
+    this.cancelCriterion = cancelCriterion;
+    this.poolStats = poolStats;
+    listener.addListener(new EndpointListenerForBridgeMembership());
+    listener.addListener(new TransactionFunctionService.ListenerForTransactionFunctionService());
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#referenceEndpoint(com.gemstone.gemfire.distributed.internal.ServerLocation)
+   */
+  public Endpoint referenceEndpoint(ServerLocation server, DistributedMember memberId) {
+    //logger.warn("REFENDPOINT server:"+server+" memberId:"+memberId);
+    Endpoint endpoint = endpointMap.get(server);
+    boolean addedEndpoint = false;
+    if(endpoint == null || endpoint.isClosed()) {
+      synchronized(this) {
+        endpoint = endpointMap.get(server);
+        if(endpoint == null || endpoint.isClosed()) {
+          ConnectionStats stats  = getStats(server);
+          Map<ServerLocation, Endpoint> endpointMapTemp = new HashMap<ServerLocation, Endpoint>(endpointMap);
+          endpoint = new Endpoint(this, ds, server, stats, memberId);
+          endpointMapTemp.put(server, endpoint);
+          endpointMap = Collections.unmodifiableMap(endpointMapTemp);
+          addedEndpoint = true;
+          poolStats.setServerCount(endpointMap.size());
+        }
+      }
+    }
+    
+    endpoint.addReference();
+    
+    if(addedEndpoint) {
+      //logger.warn("EMANFIRE2:JOIN:"+endpoint.getLocation()+" mid:"+endpoint.getMemberId());
+      listener.endpointNowInUse(endpoint);
+    } else {
+      //logger.warn("EMANFIRE33:NOJOIN:"+endpoint.getLocation()+" mid:"+endpoint.getMemberId());
+    }
+    
+    return endpoint;
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#serverCrashed(com.gemstone.gemfire.cache.client.internal.Endpoint)
+   */
+  public void serverCrashed(Endpoint endpoint) {
+    removeEndpoint(endpoint, true);
+  }
+  
+  void endpointNotInUse(Endpoint endpoint) {
+    removeEndpoint(endpoint, false);
+  }
+  
+  /** Used by Endpoint only, when the reference count for this endpoint reaches 0 */
+  private void removeEndpoint(Endpoint endpoint, boolean crashed) {
+    endpoint.close();
+    boolean removedEndpoint = false;
+    synchronized(this) {
+      Map<ServerLocation, Endpoint> endpointMapTemp = new HashMap<ServerLocation, Endpoint>(endpointMap);
+      endpoint = endpointMapTemp.remove(endpoint.getLocation());
+      if(endpoint != null) {
+        endpointMap = Collections.unmodifiableMap(endpointMapTemp);
+        removedEndpoint = true;
+      }
+      poolStats.setServerCount(endpointMap.size());
+    }
+    if(removedEndpoint) {
+      PoolImpl pool = (PoolImpl)PoolManager.find(this.poolName);
+      if (pool != null && pool.getMultiuserAuthentication()) {
+        int size = 0;
+        ArrayList<ProxyCache> proxyCaches = pool.getProxyCacheList();
+        synchronized (proxyCaches) {
+        for (ProxyCache proxyCache : proxyCaches) {
+          try {
+            Long userId = proxyCache.getUserAttributes().getServerToId().remove(
+                endpoint.getLocation());
+            if (userId != null) {
+              ++size;
+            }
+          } catch (CacheClosedException cce) {
+            // If this call is triggered by a Cache.close(), then this can be
+            // expected.
+          }
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("EndpointManagerImpl.removeEndpoint() Removed server {} from {} user's ProxyCache", endpoint.getLocation(), size);
+        }
+        }
+        UserAttributes ua = UserAttributes.userAttributes.get();
+        if (ua != null) {
+          Long userId = ua.getServerToId().remove(endpoint.getLocation());
+          if (userId != null && logger.isDebugEnabled()) {
+            logger.debug("EndpointManagerImpl.removeEndpoint() Removed server {} from thread local variable", endpoint.getLocation());
+          }
+        }
+      } else if (pool != null && !pool.getMultiuserAuthentication()) {
+        endpoint.getLocation().setUserId(-1);
+      }
+      if(crashed) {
+        listener.endpointCrashed(endpoint);
+      }
+      else {
+        listener.endpointNoLongerInUse(endpoint);
+      }
+    }
+  }
+  
+  
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#getEndpointMap()
+   */
+  public Map<ServerLocation, Endpoint> getEndpointMap() {
+    return endpointMap;
+  }
+
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#close()
+   */
+  public synchronized void close() {
+    for(Iterator<ConnectionStats> itr = statMap.values().iterator(); itr.hasNext(); ) {
+      ConnectionStats stats = itr.next();
+      stats.close();
+    }
+    
+    statMap.clear();
+    endpointMap = Collections.emptyMap();
+    listener.clear();
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#addListener(com.gemstone.gemfire.cache.client.internal.EndpointManagerImpl.EndpointListener)
+   */
+  public void addListener(EndpointManager.EndpointListener listener) {
+    this.listener.addListener(listener);
+  }
+  
+  /* (non-Javadoc)
+   * @see com.gemstone.gemfire.cache.client.internal.EndpointManager#removeListener(com.gemstone.gemfire.cache.client.internal.EndpointManagerImpl.EndpointListener)
+   */
+  public void removeListener(EndpointManager.EndpointListener listener) {
+    this.listener.removeListener(listener);
+  }
+  
+  private synchronized ConnectionStats getStats(ServerLocation location) {
+    ConnectionStats stats = statMap.get(location);
+    if(stats == null) {
+      String statName = poolName + "-" + location.toString();
+      PoolImpl pool = (PoolImpl)PoolManager.find(this.poolName);
+      if (pool != null) {
+        if (pool.getGatewaySender() != null) {
+          stats = new ConnectionStats(new DummyStatisticsFactory(), statName,
+              this.poolStats/*, this.gatewayStats*/);
+        }
+      }
+      if (stats == null) {
+        stats = new ConnectionStats(ds, statName, this.poolStats/*,
+            this.gatewayStats*/);
+      }
+      statMap.put(location, stats);
+    }
+    
+    return stats;
+  }
+  
+  public synchronized Map<ServerLocation, ConnectionStats> getAllStats() {
+    return new HashMap<ServerLocation, ConnectionStats>(statMap);
+  }
+
+  public int getConnectedServerCount() {
+    return getEndpointMap().size();
+  }
+  
+  public static void loadEmergencyClasses() {
+    //do nothing
+  }
+  
+  protected static class EndpointListenerBroadcaster implements EndpointManager.EndpointListener {
+  
+    private volatile Set/*<EndpointListener>*/<EndpointListener> endpointListeners = Collections.emptySet();
+    
+    public synchronized void addListener(EndpointManager.EndpointListener listener) {
+      HashSet<EndpointListener> tmpListeners = new HashSet<EndpointListener>(endpointListeners);
+      tmpListeners.add(listener);
+      endpointListeners = Collections.unmodifiableSet(tmpListeners);
+    }
+
+    public synchronized void clear() {
+      endpointListeners = Collections.emptySet();
+    }
+
+    public void removeListener(EndpointManager.EndpointListener listener) {
+      HashSet<EndpointListener> tmpListeners = new HashSet<EndpointListener>(endpointListeners);
+      tmpListeners.remove(listener);
+      endpointListeners = Collections.unmodifiableSet(tmpListeners);
+    }
+
+    public void endpointCrashed(Endpoint endpoint) {
+      for(Iterator<EndpointListener> itr = endpointListeners.iterator(); itr.hasNext(); ) {
+        EndpointManager.EndpointListener listener = itr.next();
+        listener.endpointCrashed(endpoint);
+      }
+    }
+
+    public void endpointNoLongerInUse(Endpoint endpoint) {
+      for(Iterator<EndpointListener> itr = endpointListeners.iterator(); itr.hasNext(); ) {
+        EndpointManager.EndpointListener listener = itr.next();
+        listener.endpointNoLongerInUse(endpoint);
+      }
+    }
+
+    public void endpointNowInUse(Endpoint endpoint) {
+      //logger.warn("HIGHUP:JOIN:"+endpoint.getLocation());
+      for(Iterator<EndpointListener> itr = endpointListeners.iterator(); itr.hasNext(); ) {
+        EndpointManager.EndpointListener listener = itr.next();
+        listener.endpointNowInUse(endpoint);
+      }
+    }
+  }
+  
+  
+  
+  public class EndpointListenerForBridgeMembership implements EndpointManager.EndpointListener {
+    
+    public void endpointCrashed(Endpoint endpoint) {
+      if(endpoint.getMemberId()==null || cancelCriterion.cancelInProgress()!=null) {
+        return;
+      }
+      //logger.warn("EMANFIRE:CRASH:"+endpoint.getLocation());
+      InternalBridgeMembership.notifyCrashed(endpoint.getMemberId(), false);
+    }
+
+    public void endpointNoLongerInUse(Endpoint endpoint) {
+      if(endpoint.getMemberId()==null || cancelCriterion.cancelInProgress()!=null) {
+        return;
+      }
+      //logger.warn("EMANFIRE:LEFT:"+endpoint.getLocation());
+      InternalBridgeMembership.notifyLeft(endpoint.getMemberId(), false);
+    }
+
+    public void endpointNowInUse(Endpoint endpoint) {
+      if(cancelCriterion.cancelInProgress()!=null) {
+        return;
+      }
+      //logger.warn("EMANFIRE:JOIN:"+endpoint.getLocation()+" mid:"+endpoint.getMemberId(),new Exception());
+      InternalBridgeMembership.notifyJoined(endpoint.getMemberId(), false);
+    }
+  }
+
+  public String getPoolName() {
+    return poolName;
+  }  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java
new file mode 100644
index 0000000..1477a7e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecutablePool.java
@@ -0,0 +1,141 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+
+import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
+import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Provides methods to execute AbstractOp instances on a client pool.
+ * @author darrel
+ * @since 5.7
+ */
+public interface ExecutablePool {
+  /**
+   * Execute the given op on the servers that this pool connects to.
+   * This method is responsible for retrying the op if an attempt fails.
+   * It will only execute it once and on one server.
+   * @param op the operation to execute
+   * @return the result of execution if any; null if not
+   * @since 5.7
+   */
+  public Object execute(Op op);
+  
+  /**
+   * Execute the given op on the servers that this pool connects to.
+   * This method is responsible for retrying the op if an attempt fails.
+   * It will only execute it once and on one server.
+   * @param op the operation to execute
+   * @return the result of execution if any; null if not
+   * @since 5.7
+   */
+  public Object execute(Op op, int retryAttempts);
+  
+  /**
+   * Execute the given op on all the servers that have server-to-client queues
+   * for this pool The last exception from any server will be thrown if the op fails.
+   * The op is executed with the primary first, followed by the backups.
+   * 
+   * @param op
+   *                the operation to execute.
+   * @throws NoSubscriptionServersAvailableException if we have no queue server
+   * @throws SubscriptionNotEnabledException If the pool does not have queues enabled
+   */
+  public void executeOnAllQueueServers(Op op) throws NoSubscriptionServersAvailableException, SubscriptionNotEnabledException;
+  /**
+   * Execute the given op on all the servers that have server-to-client queues
+   * for this pool. The op will be executed on all backups, and then the primary.
+   * This method will block until a primary is available.
+   * 
+   * @param op
+   *                the operation to execute
+   * @return The result from the primary server.
+   * @throws NoSubscriptionServersAvailableException if we have no queue server
+   * @throws SubscriptionNotEnabledException If the pool does not have queues enabled
+   * @since 5.7
+   */
+  public Object executeOnQueuesAndReturnPrimaryResult(Op op)  throws NoSubscriptionServersAvailableException, SubscriptionNotEnabledException;
+  /**
+   * Execute the given op on the given server.
+   * @param server the server to do the execution on
+   * @param op the operation to execute
+   * @return the result of execution if any; null if not
+   */
+  public Object executeOn(ServerLocation server, Op op);
+  /**
+   * Execute the given op on the given server.
+   * @param server the server to do the execution on
+   * @param op the operation to execute
+   * @param accessed true if the connection is accessed by this execute
+   * @return the result of execution if any; null if not
+   */
+  public Object executeOn(ServerLocation server, Op op, boolean accessed,boolean onlyUseExistingCnx);
+  /**
+   * Execute the given op on the given connection.
+   * @param con the connection to do the execution on
+   * @param op the operation to execute
+   * @return the result of execution if any; null if not
+   */
+  public Object executeOn(Connection con, Op op);
+  /**
+   * Execute the given op on the given connection.
+   * @param con the connection to do the execution on
+   * @param op the operation to execute
+   * @param timeoutFatal true if a timeout exception should be treated as a fatal one
+   * @return the result of execution if any; null if not
+   */
+  public Object executeOn(Connection con, Op op, boolean timeoutFatal);
+  /**
+   * Execute the given op on the current primary server.
+   * @param op the operation to execute
+   * @return the result of execution if any; null if not
+   */
+  public Object executeOnPrimary(Op op);
+  public RegisterInterestTracker getRITracker();
+  
+  /**
+   * Release the connection held by the calling
+   * thread if we're using thread local connections
+   */
+  void releaseThreadLocalConnection();
+  
+  /**
+   * The calling thread will connect to only one server for
+   * executing all ops until it calls {@link #releaseServerAffinity()}
+   * @param allowFailover true if we want to failover to another
+   * server when the first server is unreachable. Affinity to the
+   * new server will be maintained
+   * @since 6.6
+   */
+  public void setupServerAffinity(boolean allowFailover);
+  
+  /**
+   * Release the server affinity established
+   * by {@link #setupServerAffinity(boolean)}
+   * @since 6.6
+   */
+  public void releaseServerAffinity();
+  
+  /**
+   * When server affinity is enabled by this thread, returns the server against which all ops in this thread are performed 
+   * @return location of the affinity server
+   * @since 6.6
+   * @see ExecutablePool#setupServerAffinity(boolean) 
+   */
+  public ServerLocation getServerAffinityLocation();
+  
+  /**
+   * All subsequent operations by this thread will be performed on
+   * the given ServerLocation. Used for resuming suspended transactions.
+   * @param serverLocation
+   * @since 6.6
+   */
+  public void setServerAffinityLocation(ServerLocation serverLocation);
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java
new file mode 100644
index 0000000..bced6de
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionHelper.java
@@ -0,0 +1,21 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+public class ExecuteFunctionHelper {
+
+  public final static byte BUCKETS_AS_FILTER_MASK = 0x02;
+  public final static byte IS_REXECUTE_MASK = 0x01;
+  
+  static byte createFlags(boolean executeOnBucketSet, byte isReExecute) {
+    byte flags = executeOnBucketSet ? 
+        (byte)(0x00 | BUCKETS_AS_FILTER_MASK) : 0x00;
+    flags = isReExecute == 1? (byte)(flags | IS_REXECUTE_MASK) : flags;      
+    return flags;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java
new file mode 100755
index 0000000..706e8c4
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionNoAckOp.java
@@ -0,0 +1,234 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Does a Execution of function on server (possibly without region/cache) 
+ * It does not get the resulf from the server (follows Fire&Forget approch)
+ * @author Suranjan Kumar
+ * @since 5.8Beta
+ */
+public class ExecuteFunctionNoAckOp {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private ExecuteFunctionNoAckOp() {
+    // no instances allowed
+  }
+
+  /**
+   * Does a execute Function on a server using connections from the given pool
+   * to communicate with the server.
+   * 
+   * @param pool
+   *                the pool to use to communicate with the server.
+   * @param function
+   *                of the function to be executed
+   * @param args
+   *                specified arguments to the application function
+   */
+  public static void execute(PoolImpl pool, Function function,
+      Object args, MemberMappedArgument memberMappedArg,
+      boolean allServers, byte hasResult, boolean isFnSerializationReqd, String[] groups) {
+    List servers = null;
+    AbstractOp op = new ExecuteFunctionNoAckOpImpl(function, args, memberMappedArg,
+        hasResult, isFnSerializationReqd, groups, allServers);
+    try {
+      // In case of allServers getCurrentServers and call
+      // executeOn(ServerLocation server, Op op)
+      if (allServers && groups.length == 0) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to all servers using pool: " +pool);
+        }
+        servers = pool.getCurrentServers();
+        Iterator i = servers.iterator();
+        while (i.hasNext()) {          
+          pool.executeOn((ServerLocation)i.next(), op);
+        }
+      }
+      else { 
+        if (logger.isDebugEnabled()) {
+          logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to server using pool: " + pool + " with groups:" + Arrays.toString(groups) + " all members:" + allServers);
+        }
+        pool.execute(op,0);       
+      }
+    }
+    catch (Exception ex) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message:" + op.getMessage() + " to server using pool: " +pool, ex);
+      }
+      if (ex.getMessage() != null)
+        throw new FunctionException(ex.getMessage(), ex);
+      else
+        throw new FunctionException(
+            "Unexpected exception during function execution:", ex);
+    }
+  }
+
+  public static void execute(PoolImpl pool, String functionId,
+      Object args, MemberMappedArgument memberMappedArg,
+      boolean allServers, byte hasResult, boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite, String[] groups) {
+    List servers = null;
+    AbstractOp op = new ExecuteFunctionNoAckOpImpl(functionId, args, memberMappedArg,
+        hasResult, isFnSerializationReqd, isHA, optimizeForWrite, groups, allServers);
+    try {
+      // In case of allServers getCurrentServers and call
+      // executeOn(ServerLocation server, Op op)
+      if (allServers && groups.length == 0) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to all servers using pool: " +pool);
+        }
+        servers = pool.getCurrentServers();
+        Iterator i = servers.iterator();
+        while (i.hasNext()) {          
+          pool.executeOn((ServerLocation)i.next(), op);
+        }
+      }
+      else {        
+        if (logger.isDebugEnabled()) {
+          logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to server using pool: " + pool + " with groups:" + Arrays.toString(groups) + " all members:" + allServers);
+        }
+        pool.execute(op,0);       
+      }
+    }
+    catch (Exception ex) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ExecuteFunctionNoAckOp#execute : Exception occured while Sending Function Execution Message:" + op.getMessage() + " to server using pool: " +pool, ex);
+      }
+      if (ex.getMessage() != null)
+        throw new FunctionException(ex.getMessage(), ex);
+      else
+        throw new FunctionException(
+            "Unexpected exception during function execution:", ex);
+    }
+  }
+  
+  private static class ExecuteFunctionNoAckOpImpl extends AbstractOp {
+
+    /**
+     * number of parts in the request message
+     */
+    private static final int MSG_PARTS = 6;
+
+    /**
+     * @throws com.gemstone.gemfire.SerializationException
+     *                 if serialization fails
+     */
+    public ExecuteFunctionNoAckOpImpl(Function function, Object args,
+        MemberMappedArgument memberMappedArg, byte hasResult,
+        boolean isFnSerializationReqd, String[] groups, boolean allMembers) {
+      super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+      byte functionState = AbstractExecution.getFunctionState(function.isHA(),
+          function.hasResult(), function.optimizeForWrite());
+      getMessage().addBytesPart(new byte[]{functionState});
+      if(isFnSerializationReqd){
+        getMessage().addStringOrObjPart(function); 
+      }
+      else{
+        getMessage().addStringOrObjPart(function.getId()); 
+      }
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      getMessage().addObjPart(groups);
+      getMessage().addBytesPart(ExecuteFunctionOp.getByteArrayForFlags(allMembers));
+    }
+
+    /**
+     * @param functionId
+     * @param args
+     * @param memberMappedArg
+     * @param hasResult
+     * @param isFnSerializationReqd
+     * @param isHA
+     * @param optimizeForWrite
+     */
+    public ExecuteFunctionNoAckOpImpl(String functionId, Object args,
+        MemberMappedArgument memberMappedArg, byte hasResult,
+        boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite,
+        String[] groups, boolean allMembers) {
+      super(MessageType.EXECUTE_FUNCTION, MSG_PARTS);
+      getMessage().addBytesPart(
+          new byte[] { AbstractExecution.getFunctionState(isHA,
+              hasResult == (byte)1 ? true : false, optimizeForWrite) });
+      getMessage().addStringOrObjPart(functionId);
+      getMessage().addObjPart(args);
+      getMessage().addObjPart(memberMappedArg);
+      getMessage().addObjPart(groups);
+      getMessage().addBytesPart(ExecuteFunctionOp.getByteArrayForFlags(allMembers));
+    }
+
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {
+      final int msgType = msg.getMessageType();
+      if (msgType == MessageType.REPLY) {
+        return null;
+      }
+      else {
+        Part part = msg.getPart(0);
+        if (msgType == MessageType.EXCEPTION) {
+          Throwable t = (Throwable)part.getObject();
+          logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION), t);
+        }
+        else if (isErrorResponse(msgType)) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.EXECUTE_FUNCTION_NO_HAS_RESULT_RECEIVED_EXCEPTION)); // TODO:LOG:FIXED: was ", part.getString());" which makes no sense
+        }
+        else {
+          throw new InternalGemFireError("Unexpected message type "
+              + MessageType.getString(msgType));
+        }
+        return null;
+      }
+    }
+
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXECUTE_FUNCTION_ERROR;
+    }
+
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startExecuteFunction();
+    }
+
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunctionSend(start, hasFailed());
+    }
+
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endExecuteFunction(start, hasTimedOut(), hasFailed());
+    }
+
+    @Override  
+    protected Message createResponseMessage() {
+      return new Message(1, Version.CURRENT);
+    }
+  }
+
+}