You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:10 UTC

[05/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
new file mode 100644
index 0000000..f3c9f03
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInstantiatorsOp.java
@@ -0,0 +1,171 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.Instantiator;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+/**
+ * Register a bunch of instantiators on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInstantiatorsOp {
+  /**
+   * Register a bunch of instantiators on a server
+   * using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param instantiators the instantiators to register
+   * @param eventId the id of this event
+   */
+  public static void execute(ExecutablePool pool,
+                             Instantiator[] instantiators,
+                             EventID eventId)
+  {
+    AbstractOp op = new RegisterInstantiatorsOpImpl(instantiators, eventId);
+    pool.execute(op, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Register a bunch of instantiators on a server using connections from the
+   * given pool to communicate with the server.
+   * 
+   * @param pool
+   *          the pool to use to communicate with the server.
+   * @param holders
+   *          the {@link InstantiatorAttributesHolder}s containing info about
+   *          the instantiators to register
+   * @param eventId
+   *          the id of this event
+   */
+  public static void execute(ExecutablePool pool,
+      Object[] holders, EventID eventId) {
+    AbstractOp op = new RegisterInstantiatorsOpImpl(holders,
+        eventId);
+    pool.execute(op, Integer.MAX_VALUE);
+  }
+
+  private RegisterInstantiatorsOp() {
+    // no instances allowed
+  }
+  
+  private static class RegisterInstantiatorsOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public RegisterInstantiatorsOpImpl(Instantiator[] instantiators,
+                                       EventID eventId) {
+      super(MessageType.REGISTER_INSTANTIATORS, instantiators.length * 3 + 1);
+      for(int i = 0; i < instantiators.length; i++) {
+        Instantiator instantiator = instantiators[i];
+         // strip '.class' off these class names
+        String className = instantiator.getClass().toString().substring(6);
+        String instantiatedClassName = instantiator.getInstantiatedClass().toString().substring(6);
+        try {
+          getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
+          getMessage().addBytesPart(BlobHelper.serializeToBlob(instantiatedClassName));
+        } catch (IOException ex) {
+          throw new SerializationException("failed serializing object", ex);
+        }
+        getMessage().addIntPart(instantiator.getId());
+      }
+      getMessage().addBytesPart(eventId.calcBytes());
+//     // // CALLBACK FOR TESTING PURPOSE ONLY ////
+      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.beforeSendingToServer(eventId);
+      }
+    }
+
+    /**
+     * @throws com.gemstone.gemfire.SerializationException
+     *           if serialization fails
+     */
+    public RegisterInstantiatorsOpImpl(Object[] holders,
+        EventID eventId) {
+      super(MessageType.REGISTER_INSTANTIATORS, holders.length * 3 + 1);
+      for (Object obj : holders) {
+        String instantiatorClassName = null;
+        String instantiatedClassName = null;
+        int id = 0;
+        if (obj instanceof Instantiator) {
+          instantiatorClassName = ((Instantiator)obj).getClass().getName();
+          instantiatedClassName = ((Instantiator)obj).getInstantiatedClass()
+              .getName();
+          id = ((Instantiator)obj).getId();
+        } else {
+          instantiatorClassName = ((InstantiatorAttributesHolder)obj)
+              .getInstantiatorClassName();
+          instantiatedClassName = ((InstantiatorAttributesHolder)obj)
+              .getInstantiatedClassName();
+          id = ((InstantiatorAttributesHolder)obj).getId();
+        }
+        try {
+          getMessage().addBytesPart(
+              BlobHelper.serializeToBlob(instantiatorClassName));
+          getMessage().addBytesPart(
+              BlobHelper.serializeToBlob(instantiatedClassName));
+        } catch (IOException ex) {
+          throw new SerializationException("failed serializing object", ex);
+        }
+        getMessage().addIntPart(id);
+      }
+      getMessage().addBytesPart(eventId.calcBytes());
+      // // // CALLBACK FOR TESTING PURPOSE ONLY ////
+      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.beforeSendingToServer(eventId);
+      }
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "registerInstantiators");
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startRegisterInstantiators();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endRegisterInstantiatorsSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endRegisterInstantiators(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java
new file mode 100644
index 0000000..873f3d6
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestListOp.java
@@ -0,0 +1,138 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.client.internal.RegisterInterestOp.RegisterInterestOpImpl;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+
+/**
+ * Does a region registerInterestList on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInterestListOp {
+  /**
+   * Does a region registerInterestList on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the registerInterestList on
+   * @param keys list of keys we are interested in
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public static List execute(ExecutablePool pool,
+                             String region,
+                             List keys,
+                             InterestResultPolicy policy,
+                             boolean isDurable,
+                             boolean receiveUpdatesAsInvalidates,
+                             byte regionDataPolicy)
+  {
+    AbstractOp op = new RegisterInterestListOpImpl(region, keys, policy,
+        isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    return (List) pool.executeOnQueuesAndReturnPrimaryResult(op);
+  }
+                                                               
+  private RegisterInterestListOp() {
+    // no instances allowed
+  }
+  /**
+   * Does a region registerInterestList on a server using connections from the given pool
+   * to communicate with the given server location.
+   * @param sl the server to do the register interest on.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the registerInterest on
+   * @param keys describes what we are interested in
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public static List executeOn(ServerLocation sl,
+                               ExecutablePool pool,
+                               String region,
+                               List keys,
+                               InterestResultPolicy policy,
+                               boolean isDurable,
+                               boolean receiveUpdatesAsInvalidates,
+                               byte regionDataPolicy)
+  { 
+    AbstractOp op = new RegisterInterestListOpImpl(region, keys, policy,
+        isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    return  (List) pool.executeOn(sl, op);
+  }
+
+  
+  /**
+   * Does a region registerInterestList on a server using connections from the given pool
+   * to communicate with the given server location.
+   * @param conn the connection to do the register interest on.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the registerInterest on
+   * @param keys describes what we are interested in
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public static List executeOn(Connection conn,
+                               ExecutablePool pool,
+                               String region,
+                               List keys,
+                               InterestResultPolicy policy,
+                               boolean isDurable,
+                               boolean receiveUpdatesAsInvalidates,
+                               byte regionDataPolicy)
+  {
+    AbstractOp op = new RegisterInterestListOpImpl(region, keys, policy,
+        isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    return  (List) pool.executeOn(conn, op);
+  }
+  
+  private static class RegisterInterestListOpImpl extends RegisterInterestOpImpl {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public RegisterInterestListOpImpl(String region,
+                                      List keys,
+                                      InterestResultPolicy policy,
+                                      boolean isDurable,
+                                      boolean receiveUpdatesAsInvalidates,
+                                      byte regionDataPolicy) {
+      super(region, MessageType.REGISTER_INTEREST_LIST, 6);
+      getMessage().addStringPart(region);
+      getMessage().addObjPart(policy);
+      {
+        byte durableByte = (byte)(isDurable ? 0x01 : 0x00);
+        getMessage().addBytesPart(new byte[] {durableByte});
+      }      
+      //Set chunk size of HDOS for keys      
+      getMessage().setChunkSize(keys.size()*16);
+      getMessage().addObjPart(keys);
+      
+      byte notifyByte = (byte)(receiveUpdatesAsInvalidates ? 0x01 : 0x00);
+      getMessage().addBytesPart(new byte[] {notifyByte});
+
+      // The second byte '1' below tells server to serialize values in VersionObjectList.
+      // Java clients always expect serializeValues to be true in VersionObjectList unlike Native clients.
+      // This was being sent as part of GetAllOp prior to fixing #43684.
+      getMessage().addBytesPart(new byte[] {regionDataPolicy, (byte)0x01});
+    }
+    @Override
+    protected String getOpName() {
+      return "registerInterestList";
+    }
+    // note we reuse the same stats used by RegisterInterestOpImpl
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java
new file mode 100644
index 0000000..db0a47f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestOp.java
@@ -0,0 +1,287 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * Does a region registerInterest on a server
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInterestOp {
+  /**
+   * Does a region registerInterest on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the registerInterest on
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public static List execute(ExecutablePool pool,
+                             String region,
+                             Object key,
+                             int interestType,
+                             InterestResultPolicy policy,
+                             boolean isDurable,
+                             boolean receiveUpdatesAsInvalidates,
+                             byte regionDataPolicy)
+  {
+    AbstractOp op = new RegisterInterestOpImpl(region, key,
+        interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    return  (List) pool.executeOnQueuesAndReturnPrimaryResult(op);
+  }
+                                                               
+  /**
+   * Does a region registerInterest on a server using connections from the given pool
+   * to communicate with the given server location.
+   * @param sl the server to do the register interest on.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the registerInterest on
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public static List executeOn(ServerLocation sl,
+                               ExecutablePool pool,
+                               String region,
+                               Object key,
+                               int interestType,
+                               InterestResultPolicy policy,
+                               boolean isDurable,
+                               boolean receiveUpdatesAsInvalidates,
+                               byte regionDataPolicy)
+  {
+    AbstractOp op = new RegisterInterestOpImpl(region, key,
+        interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    return  (List) pool.executeOn(sl, op);
+  }
+
+  
+  /**
+   * Does a region registerInterest on a server using connections from the given pool
+   * to communicate with the given server location.
+   * @param conn the connection to do the register interest on.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the registerInterest on
+   * @param key describes what we are interested in
+   * @param interestType the {@link InterestType} for this registration
+   * @param policy the interest result policy for this registration
+   * @param isDurable true if this registration is durable
+   * @param regionDataPolicy the data policy ordinal of the region
+   * @return list of keys
+   */
+  public static List executeOn(Connection conn,
+                               ExecutablePool pool,
+                               String region,
+                               Object key,
+                               int interestType,
+                               InterestResultPolicy policy,
+                               boolean isDurable,
+                               boolean receiveUpdatesAsInvalidates,
+                               byte regionDataPolicy)
+  {
+    AbstractOp op = new RegisterInterestOpImpl(region, key,
+        interestType, policy, isDurable, receiveUpdatesAsInvalidates, regionDataPolicy);
+    return  (List) pool.executeOn(conn, op);
+  }
+
+  
+  private RegisterInterestOp() {
+    // no instances allowed
+  }
+
+  protected static class RegisterInterestOpImpl extends AbstractOp {
+    protected String region;
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public RegisterInterestOpImpl(String region,
+                                  Object key,
+                                  int interestType,
+                                  InterestResultPolicy policy,
+                                  boolean isDurable,
+                                  boolean receiveUpdatesAsInvalidates,
+                                  byte regionDataPolicy) {
+      super(MessageType.REGISTER_INTEREST, 7);
+      this.region = region;
+      getMessage().addStringPart(region);
+      getMessage().addIntPart(interestType);
+      getMessage().addObjPart(policy);
+      {
+        byte durableByte = (byte)(isDurable ? 0x01 : 0x00);
+        getMessage().addBytesPart(new byte[] {durableByte});
+      }
+      getMessage().addStringOrObjPart(key);
+      byte notifyByte = (byte)(receiveUpdatesAsInvalidates ? 0x01 : 0x00);
+      getMessage().addBytesPart(new byte[] {notifyByte});
+
+      // The second byte '1' below tells server to serialize values in VersionObjectList.
+      // Java clients always expect serializeValues to be true in VersionObjectList unlike Native clients.
+      // This was being sent as part of GetAllOp prior to fixing #43684.
+      getMessage().addBytesPart(new byte[] {regionDataPolicy, (byte)0x01});
+    }
+    /**
+     * This constructor is used by our subclass CreateCQWithIROpImpl
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    protected RegisterInterestOpImpl(String region, int msgType, int numParts) {
+      super(msgType, numParts);
+      this.region = region;
+    }
+
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(1, Version.CURRENT);
+    }
+    
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override  
+    protected Object processResponse(Message m, Connection con) throws Exception {
+      ChunkedMessage msg = (ChunkedMessage)m;
+      msg.readHeader();
+      switch (msg.getMessageType()) {
+      case MessageType.RESPONSE_FROM_PRIMARY: {
+        ArrayList serverKeys = new ArrayList();
+        VersionedObjectList serverEntries = null;
+        LocalRegion r = null;
+        
+        try {
+          r = (LocalRegion)GemFireCacheImpl.getInstance().getRegion(this.region);
+        }catch(Exception ex) {
+		//ignore but read message
+	//	GemFireCacheImpl.getInstance().getLogger().config("hitesh error " + ex.getClass());
+        }
+        
+        ArrayList list = new ArrayList();
+        ArrayList listOfList = new ArrayList();
+        listOfList.add(list);
+
+        // Process the chunks
+        do {
+          // Read the chunk
+          msg.receiveChunk();
+
+          // Deserialize the result
+          Part part = msg.getPart(0);
+
+          Object partObj = part.getObject();
+          if (partObj instanceof Throwable) {
+            String s = "While performing a remote " + getOpName();
+            throw new ServerOperationException(s, (Throwable)partObj);
+            // Get the exception toString part.
+            // This was added for c++ thin client and not used in java
+            //Part exceptionToStringPart = msg.getPart(1);
+          }
+          else {
+            if (partObj instanceof VersionedObjectList) {
+              if (serverEntries == null) {
+                serverEntries = new VersionedObjectList(true);
+              }
+              ((VersionedObjectList)partObj).replaceNullIDs(con.getEndpoint().getMemberId());
+              // serverEntries.addAll((VersionedObjectList)partObj);
+              list.clear();
+              list.add(partObj);
+
+	      if(r != null) {
+		try {      
+                  r.refreshEntriesFromServerKeys(con, listOfList, InterestResultPolicy.KEYS_VALUES);
+		}catch(Exception ex) {
+	//	  GemFireCacheImpl.getInstance().getLogger().config("hitesh error2 " + ex.getClass());
+		}
+	      }
+            } else {
+              // Add the result to the list of results
+              serverKeys.add((List)partObj);
+            }
+          }
+
+        } while (!msg.isLastChunk());
+        if (serverEntries != null) {
+          list.clear();
+          list.add(serverEntries); // serverEntries will always be empty.
+          return listOfList;
+        }
+        return serverKeys;
+      }
+      case MessageType.RESPONSE_FROM_SECONDARY:
+        // Read the chunk
+        msg.receiveChunk();
+        return null;
+      case MessageType.EXCEPTION:
+        // Read the chunk
+        msg.receiveChunk();
+        // Deserialize the result
+        Part part = msg.getPart(0);
+        // Get the exception toString part.
+        // This was added for c++ thin client and not used in java
+        //Part exceptionToStringPart = msg.getPart(1);
+        Object obj = part.getObject();
+        {
+          String s = this + ": While performing a remote " + getOpName();
+          throw new ServerOperationException(s, (Throwable) obj);
+        }
+      case MessageType.REGISTER_INTEREST_DATA_ERROR:
+        // Read the chunk
+        msg.receiveChunk();
+
+        // Deserialize the result
+        String errorMessage = msg.getPart(0).getString();
+        String s = this + ": While performing a remote " + getOpName() + ": ";
+        throw new ServerOperationException(s + errorMessage);
+      default:
+        throw new InternalGemFireError("Unknown message type "
+                                       + msg.getMessageType());
+      }
+    }
+    protected String getOpName() {
+      return "registerInterest";
+    }
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.REGISTER_INTEREST_DATA_ERROR;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startRegisterInterest();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endRegisterInterestSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endRegisterInterest(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java
new file mode 100644
index 0000000..9d67251
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterInterestTracker.java
@@ -0,0 +1,410 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.UnregisterAllInterest;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Used to keep track of what interest a client has registered.
+ * This code was extracted from the old ConnectionProxyImpl.
+ * @author darrel
+ * @since 5.7
+ */
+public class RegisterInterestTracker {
+  private static final Logger logger = LogService.getLogger();
+  
+  public final static int interestListIndex = 0;
+  public final static int durableInterestListIndex = 1;
+  public final static int interestListIndexForUpdatesAsInvalidates = 2;
+  public final static int durableInterestListIndexForUpdatesAsInvalidates = 3;
+  
+  private final FailoverInterestList[] fils = new FailoverInterestList[4];
+  
+  /** Manages CQs */
+  private final ConcurrentMap cqs /* <CqQuery,Boolean> */= new ConcurrentHashMap();
+  
+  public RegisterInterestTracker() {
+    this.fils[interestListIndex] = new FailoverInterestList();
+    this.fils[interestListIndexForUpdatesAsInvalidates] = new FailoverInterestList();
+    this.fils[durableInterestListIndex] = new FailoverInterestList();
+    this.fils[durableInterestListIndexForUpdatesAsInvalidates] = new FailoverInterestList();
+  }
+  
+  public static int getInterestLookupIndex(boolean isDurable, boolean receiveUpdatesAsInvalidates) {
+    if (isDurable) {
+      if (receiveUpdatesAsInvalidates) {
+        return durableInterestListIndexForUpdatesAsInvalidates;
+      } else {
+        return durableInterestListIndex;
+      }
+    } else {
+      if (receiveUpdatesAsInvalidates) {
+        return interestListIndexForUpdatesAsInvalidates;
+      } else {
+        return interestListIndex;
+      }
+    }
+  }
+  
+  public List getInterestList(String regionName, int interestType)
+  {
+    RegionInterestEntry rie1 = readRegionInterests(regionName, interestType, false, false);
+    RegionInterestEntry rie2 = readRegionInterests(regionName, interestType, false, true);
+    RegionInterestEntry rie3 = readRegionInterests(regionName, interestType, true, false);
+    RegionInterestEntry rie4 = readRegionInterests(regionName, interestType, true, true);
+    
+    ArrayList result = new ArrayList();
+    
+    if (rie1 != null) {
+      result.addAll(rie1.getInterests().keySet());
+    }
+    
+    if (rie2 != null) {
+      result.addAll(rie2.getInterests().keySet());
+    }
+    
+    if (rie3 != null) {
+      result.addAll(rie3.getInterests().keySet());
+    }
+    
+    if (rie4 != null) {
+      result.addAll(rie4.getInterests().keySet());
+    }
+    
+    return result;
+  }
+
+  public void addSingleInterest(LocalRegion r, Object key, int interestType,
+      InterestResultPolicy pol, boolean isDurable, boolean receiveUpdatesAsInvalidates)
+  {
+    RegionInterestEntry rie = getRegionInterests(r, interestType, false,
+        isDurable, receiveUpdatesAsInvalidates);
+    rie.getInterests().put(key, pol);
+  }
+
+  public boolean removeSingleInterest(LocalRegion r, Object key,
+      int interestType, boolean isDurable, boolean receiveUpdatesAsInvalidates)
+  {
+    RegionInterestEntry rie = getRegionInterests(r, interestType, true,
+        isDurable, receiveUpdatesAsInvalidates);
+    if (rie == null) {
+      return false;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("removeSingleInterest region={} key={}", r.getFullPath(), key);
+    }
+    Object interest = rie.getInterests().remove(key);
+    if (interest == null) {
+      logger.warn(LocalizedMessage.create(LocalizedStrings.RegisterInterestTracker_REMOVESINGLEINTEREST_KEY_0_NOT_REGISTERED_IN_THE_CLIENT, key));
+      return false;
+    }
+    else {
+      return true;
+    }
+    //return rie.getInterests().remove(key) != null;
+  }
+
+  public void addInterestList(LocalRegion r, List keys,
+      InterestResultPolicy pol, boolean isDurable, boolean receiveUpdatesAsInvalidates)
+  {
+    RegionInterestEntry rie = getRegionInterests(r, InterestType.KEY, false,
+        isDurable, receiveUpdatesAsInvalidates);
+    for (int i = 0; i < keys.size(); i++) {
+      rie.getInterests().put(keys.get(i), pol);
+    }
+  }
+
+  public void addCq(InternalCqQuery cqi, boolean isDurable)
+  {
+    this.cqs.put(cqi, Boolean.valueOf(isDurable));
+    /*
+    RegionInterestEntry rie = getRegionInterests(r, InterestType.CQ, false, isDurable);
+      rie.getInterests().put(cqi.getName(), cqi);
+      */
+  }
+  
+  public void removeCq(InternalCqQuery cqi, boolean isDurable)
+  {
+    this.cqs.remove(cqi);
+    /*
+    RegionInterestEntry rie = getRegionInterests(r, InterestType.CQ, false, isDurable);
+      rie.getInterests().remove(cqi.getName());
+      */
+  }
+
+  public Map getCqsMap(){
+    return this.cqs;
+  }
+  
+  /**
+   * Unregisters everything registered on the given region name
+   */
+  public void unregisterRegion(ServerRegionProxy srp,
+                               boolean keepalive) {
+    removeAllInterests(srp, InterestType.KEY, false, keepalive, false);
+    removeAllInterests(srp, InterestType.FILTER_CLASS, false, keepalive, false);
+    removeAllInterests(srp, InterestType.OQL_QUERY, false, keepalive, false);
+    removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, false, keepalive, false);
+    removeAllInterests(srp, InterestType.KEY, false, keepalive, true);
+    removeAllInterests(srp, InterestType.FILTER_CLASS, false, keepalive, true);
+    removeAllInterests(srp, InterestType.OQL_QUERY, false, keepalive, true);
+    removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, false, keepalive, true);
+    //durable
+    if (srp.getPool().isDurableClient()) {
+      removeAllInterests(srp, InterestType.KEY, true, keepalive, true);
+      removeAllInterests(srp, InterestType.FILTER_CLASS, true, keepalive, true);
+      removeAllInterests(srp, InterestType.OQL_QUERY, true, keepalive, true);
+      removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, true, keepalive, true);
+      removeAllInterests(srp, InterestType.KEY, true, keepalive, false);
+      removeAllInterests(srp, InterestType.FILTER_CLASS, true, keepalive, false);
+      removeAllInterests(srp, InterestType.OQL_QUERY, true, keepalive, false);
+      removeAllInterests(srp, InterestType.REGULAR_EXPRESSION, true, keepalive, false);
+    }
+  }
+  
+  /**
+   * Remove all interests of a given type on the given proxy's region.
+   * @param interestType
+   *          the interest type
+   * @param durable
+   *          a boolean stating whether to remove durable or non-durable registrations
+   */
+  private void removeAllInterests(ServerRegionProxy srp, int interestType,
+      boolean durable, boolean keepAlive, boolean receiveUpdatesAsInvalidates)
+  {
+    String regName = srp.getRegionName();
+    ConcurrentMap allInterests = getRegionToInterestsMap(interestType, durable, receiveUpdatesAsInvalidates);
+    if (allInterests.remove(regName) != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("removeAllInterests region={} type={}", regName, InterestType.getString(interestType));
+      }
+      try {
+        // fix bug 35680 by using a UnregisterAllInterest token
+        Object key = UnregisterAllInterest.singleton();
+        // we have already cleaned up the tracker so send the op directly
+        UnregisterInterestOp.execute(srp.getPool(), regName, key, interestType,
+                                     true/*isClosing*/, keepAlive);
+      }
+      catch (Exception e) {
+        if(srp.getPool().getCancelCriterion().cancelInProgress() == null) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.RegisterInterestTracker_PROBLEM_REMOVING_ALL_INTEREST_ON_REGION_0_INTERESTTYPE_1_2,
+              new Object[] {regName, InterestType.getString(interestType), e.getLocalizedMessage()}));
+        }
+      }
+    }
+  }
+
+  public boolean removeInterestList(LocalRegion r, List keys, boolean isDurable,
+      boolean receiveUpdatesAsInvalidates)
+  {
+    RegionInterestEntry rie = getRegionInterests(r, InterestType.KEY, true,
+        isDurable, receiveUpdatesAsInvalidates);
+    if (rie == null) {
+      return false;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("removeInterestList region={} keys={}", r.getFullPath(), keys);
+    }
+    int removeCount = 0;
+    for (int i = 0; i < keys.size(); i++) {
+      Object key = keys.get(i);
+      Object interest = rie.getInterests().remove(key);
+      if (interest != null) {
+        removeCount++;
+      }
+      else {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.RegisterInterestTracker_REMOVEINTERESTLIST_KEY_0_NOT_REGISTERED_IN_THE_CLIENT,
+            key));
+      }
+    }
+    return removeCount != 0;
+  }
+
+  /**
+   * Return keys of interest for a given region. The keys in this Map are the
+   * full names of the regions. The values are instances of RegionInterestEntry.
+   *
+   * @param interestType
+   *          the type to return
+   * @return the map
+   */
+  public ConcurrentMap getRegionToInterestsMap(int interestType, boolean isDurable,
+      boolean receiveUpdatesAsInvalidates)
+  {
+    FailoverInterestList fil =
+      this.fils[getInterestLookupIndex(isDurable, receiveUpdatesAsInvalidates)];
+    
+    ConcurrentMap mapOfInterest = null;
+    
+    switch (interestType) {
+    case InterestType.KEY:
+      mapOfInterest = fil.keysOfInterest;
+      break;
+    case InterestType.REGULAR_EXPRESSION:
+      mapOfInterest = fil.regexOfInterest;
+      break;
+    case InterestType.FILTER_CLASS:
+      mapOfInterest = fil.filtersOfInterest;
+      break;
+    case InterestType.CQ:
+      mapOfInterest = fil.cqsOfInterest;
+      break;
+    case InterestType.OQL_QUERY:
+      mapOfInterest = fil.queriesOfInterest;
+      break;
+    default:
+      throw new InternalGemFireError("Unknown interestType");
+    }
+    return mapOfInterest;
+  }
+
+  /**
+   * Return the RegionInterestEntry for the given region. Create one if none
+   * exists and forRemoval is false.
+   *
+   * @param r
+   *          specified region
+   * @param interestType
+   *          desired interest type
+   * @param forRemoval
+   *          true if calls wants one for removal
+   * @return the entry or null if none exists and forRemoval is true.
+   */
+  private RegionInterestEntry getRegionInterests(LocalRegion r,
+                                                 int interestType,
+                                                 boolean forRemoval,
+                                                 boolean isDurable,
+                                                 boolean receiveUpdatesAsInvalidates)
+  {
+    final String regionName = r.getFullPath();
+    ConcurrentMap mapOfInterest = getRegionToInterestsMap(interestType, isDurable, receiveUpdatesAsInvalidates);
+    RegionInterestEntry result = (RegionInterestEntry)
+      mapOfInterest.get(regionName);
+    if (result == null && !forRemoval) {
+      RegionInterestEntry rie = new RegionInterestEntry(r);
+      result = (RegionInterestEntry)mapOfInterest.putIfAbsent(regionName, rie);
+      if (result == null) {
+        result = rie;
+      }
+    }
+    return result;
+  }
+  private RegionInterestEntry readRegionInterests(String regionName,
+                                                  int interestType,
+                                                  boolean isDurable,
+                                                  boolean receiveUpdatesAsInvalidates)
+  {
+    ConcurrentMap mapOfInterest = getRegionToInterestsMap(interestType, isDurable, receiveUpdatesAsInvalidates);
+    return (RegionInterestEntry)mapOfInterest.get(regionName);
+  }
+
+  /**
+   * A Holder object for client's register interest, this is required when 
+   * a client fails over to another server and does register interest based on 
+   * this Data structure 
+   * 
+   * @author Yogesh Mahajan
+   * @since 5.5
+   *
+   */
+   static protected class FailoverInterestList {
+    /**
+     * Record of enumerated keys of interest.
+     *
+     * This list is maintained here in case an endpoint (server) bounces. In that
+     * case, a message will be sent to the endpoint as soon as it has restarted.
+     *
+     * The keys in this Map are the full names of the regions. The values are
+     * instances of {@link RegionInterestEntry}.
+     */
+    final ConcurrentMap keysOfInterest = new ConcurrentHashMap();
+
+    /**
+     * Record of regular expression keys of interest.
+     *
+     * This list is maintained here in case an endpoint (server) bounces. In that
+     * case, a message will be sent to the endpoint as soon as it has restarted.
+     *
+     * The keys in this Map are the full names of the regions. The values are
+     * instances of {@link RegionInterestEntry}.
+     */
+     final ConcurrentMap regexOfInterest = new ConcurrentHashMap();
+
+    /**
+     * Record of filtered keys of interest.
+     *
+     * This list is maintained here in case an endpoint (server) bounces. In that
+     * case, a message will be sent to the endpoint as soon as it has restarted.
+     *
+     * The keys in this Map are the full names of the regions. The values are
+     * instances of {@link RegionInterestEntry}.
+     */
+     final ConcurrentMap filtersOfInterest = new ConcurrentHashMap();
+
+    /**
+     * Record of QOL keys of interest.
+     *
+     * This list is maintained here in case an endpoint (server) bounces. In that
+     * case, a message will be sent to the endpoint as soon as it has restarted.
+     *
+     * The keys in this Map are the full names of the regions. The values are
+     * instances of {@link RegionInterestEntry}.
+     */
+     final ConcurrentMap queriesOfInterest = new ConcurrentHashMap();
+     
+     /**
+      * Record of registered CQs
+      *
+      */
+      final ConcurrentMap cqsOfInterest = new ConcurrentHashMap();
+  }
+
+  /**
+   * Description of the interests of a particular region.
+   *
+   * @author jpenney
+   *
+   */
+  static public class RegionInterestEntry
+  {
+    private final LocalRegion region;
+
+    private final ConcurrentMap interests;
+
+    RegionInterestEntry(LocalRegion r) {
+      this.region = r;
+      this.interests = new ConcurrentHashMap();
+    }
+
+    public LocalRegion getRegion() {
+      return this.region;
+    }
+
+    public ConcurrentMap getInterests() {
+      return this.interests;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
new file mode 100644
index 0000000..b4bf64d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
@@ -0,0 +1,382 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Does a region removeAll on a server
+ * @author darrel
+ * @since 8.1
+ */
+public class RemoveAllOp {
+  
+  public static final Logger logger = LogService.getLogger();
+  
+  public static final int FLAG_EMPTY = 0x01;
+  public static final int FLAG_CONCURRENCY_CHECKS = 0x02;
+
+  
+  /**
+   * Does a region removeAll on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the removeAll on
+   * @param keys Collection of keys to remove
+   * @param eventId the event id for this op
+   */
+  public static VersionedObjectList execute(ExecutablePool pool,
+                             Region region,
+                             Collection<Object> keys,
+                             EventID eventId,
+                             boolean isRetry, Object callbackArg)
+  {
+    RemoveAllOpImpl op = new RemoveAllOpImpl(region, keys,
+        eventId, ((PoolImpl)pool).getPRSingleHopEnabled(), callbackArg);
+    op.initMessagePart();
+    if(isRetry) {
+      op.getMessage().setIsRetry();
+    }
+    return (VersionedObjectList)pool.execute(op);
+  }
+  
+  /**
+   * Does a region put on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   * @param region the name of the region to do the removeAll on
+   * @param keys the Collection of keys to remove
+   * @param eventId the event id for this removeAll
+   */
+  public static VersionedObjectList execute(ExecutablePool pool,
+                             Region region,
+                             Collection<Object> keys,
+                             EventID eventId, 
+                             int retryAttempts, Object callbackArg)
+  {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    ClientMetadataService cms = ((LocalRegion)region).getCache()
+        .getClientMetadataService();
+
+    Map<ServerLocation, HashSet> serverToFilterMap = cms.getServerToFilterMap(
+        keys, region, true);
+
+    if (serverToFilterMap == null || serverToFilterMap.isEmpty()) {
+      AbstractOp op = new RemoveAllOpImpl(region, keys,
+          eventId, ((PoolImpl)pool).getPRSingleHopEnabled(), callbackArg);
+      op.initMessagePart();
+      return (VersionedObjectList)pool.execute(op);
+    }
+
+    List callableTasks = constructAndGetRemoveAllTasks(region,
+        eventId, serverToFilterMap, (PoolImpl)pool, callbackArg);
+
+    if (isDebugEnabled) {
+      logger.debug("RemoveAllOp#execute : Number of removeAll tasks is :{}", callableTasks.size());
+    }
+    HashMap<ServerLocation, RuntimeException> failedServers = new HashMap<ServerLocation,RuntimeException>();
+    PutAllPartialResult result = new PutAllPartialResult(keys.size());
+    try {
+      Map<ServerLocation, Object> results = SingleHopClientExecutor
+          .submitBulkOp(callableTasks, cms, (LocalRegion)region, failedServers);
+      for (Map.Entry<ServerLocation, Object> entry: results.entrySet()) {
+        Object value = entry.getValue();
+        if (value instanceof PutAllPartialResultException) {
+          PutAllPartialResultException pap = (PutAllPartialResultException)value;
+          if (isDebugEnabled) {
+            logger.debug("RemoveAll SingleHop encountered BulkOpPartialResultException exception: {}, failedServers are {}", pap, failedServers.keySet());
+          }
+          result.consolidate(pap.getResult());
+        } else {
+          if (value != null) {
+            VersionedObjectList list = (VersionedObjectList)value;
+            result.addKeysAndVersions(list);
+          }
+        }
+      }
+    } catch (RuntimeException ex) {
+      logger.debug("single-hop removeAll encountered unexpected exception: {}",ex);
+        throw ex;
+      }
+
+    if (!failedServers.isEmpty()) {
+      if (retryAttempts == 0) {
+        throw failedServers.values().iterator().next();
+      }
+
+      // if the partial result set doesn't already have keys (for tracking version tags)
+      // then we need to gather up the keys that we know have succeeded so far and
+      // add them to the partial result set
+      if (result.getSucceededKeysAndVersions().size() == 0) {
+        // if there're failed servers, we need to save the succeed keys in submitRemoveAll
+        // if retry succeeded, everything is ok, otherwise, the saved "succeeded
+        // keys" should be consolidated into PutAllPartialResultException
+      // succeedKeySet is used to send back to client in PartialResult case
+      // so it's not a must to use LinkedHashSet
+      Set succeedKeySet = new LinkedHashSet();
+      Set<ServerLocation> serverSet = serverToFilterMap.keySet();
+      for (ServerLocation server : serverSet) {
+        if (!failedServers.containsKey(server)) {
+          succeedKeySet.addAll(serverToFilterMap.get(server));
+        }
+      }
+  
+      // save succeedKeys, but if retries all succeeded, discard the PutAllPartialResult
+        result.addKeys(succeedKeySet);
+      }
+      
+      // send maps for the failed servers one by one instead of merging 
+      // them into one big map. The reason is, we have to keep the same event
+      // ids for each sub map. There is a unit test in PutAllCSDUnitTest for
+      // the otherwise case.
+      boolean oneSubMapRetryFailed = false;
+      Set<ServerLocation> failedServerSet = failedServers.keySet();
+      for (ServerLocation failedServer : failedServerSet) {
+        //        Throwable failedServers.values().iterator().next();
+        RuntimeException savedRTE = failedServers.get(failedServer);
+        if (savedRTE instanceof PutAllPartialResultException) {
+          // will not retry for BulkOpPartialResultException
+          // but it means at least one sub map ever failed 
+          oneSubMapRetryFailed = true;
+          continue;
+        }
+        Collection<Object> newKeys = serverToFilterMap.get(failedServer);
+        try {
+          VersionedObjectList v = RemoveAllOp.execute(pool, region, newKeys, eventId, true, callbackArg);
+          if (v == null) {
+            result.addKeys(newKeys);
+          } else {
+            result.addKeysAndVersions(v);
+          }
+        } catch (PutAllPartialResultException pre) {
+          oneSubMapRetryFailed = true;
+          logger.debug("Retry failed with BulkOpPartialResultException: {} Before retry: {}", pre, result.getKeyListString());
+          result.consolidate(pre.getResult());
+        } catch (Exception rte) {
+          oneSubMapRetryFailed = true;
+          Object firstKey = newKeys.iterator().next();
+          result.saveFailedKey(firstKey, rte);
+        }
+      } // for failedServer
+
+      // If all retries succeeded, the PRE in first tries can be ignored
+      if (oneSubMapRetryFailed && result.hasFailure()) {
+        PutAllPartialResultException pre = new PutAllPartialResultException(result);
+        throw pre;
+      }
+    } // failedServers!=null
+
+    return result.getSucceededKeysAndVersions();
+  }
+  
+  private RemoveAllOp() {
+    // no instances allowed
+  }
+  
+  
+  static List constructAndGetRemoveAllTasks(Region region,
+      final EventID eventId, 
+      final Map<ServerLocation, HashSet> serverToFilterMap,
+      final PoolImpl pool, Object callbackArg) {
+    final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>();
+    ArrayList<ServerLocation> servers = new ArrayList<ServerLocation>(
+        serverToFilterMap.keySet());
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Constructing tasks for the servers{}", servers);
+    }
+    for (ServerLocation server : servers) {
+      AbstractOp RemoveAllOp = new RemoveAllOpImpl(region,
+          serverToFilterMap.get(server), eventId, true, callbackArg);
+
+      SingleHopOperationCallable task = new SingleHopOperationCallable(
+          new ServerLocation(server.getHostName(), server.getPort()), pool,
+          RemoveAllOp,UserAttributes.userAttributes.get());
+      tasks.add(task);
+    }
+    return tasks;
+  }
+
+  private static class RemoveAllOpImpl extends AbstractOp {
+    
+    private boolean prSingleHopEnabled = false;
+    
+    private LocalRegion region = null;
+    
+    private Collection<Object> keys = null;
+    private final Object callbackArg;
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public RemoveAllOpImpl(Region region, Collection<Object> keys,
+        EventID eventId, boolean prSingleHopEnabled, Object callbackArg) {
+      super(MessageType.REMOVE_ALL, 5 + keys.size());
+      this.prSingleHopEnabled = prSingleHopEnabled;
+      this.region = (LocalRegion)region;
+      getMessage().addStringPart(region.getFullPath());
+      getMessage().addBytesPart(eventId.calcBytes());
+      this.keys = keys;
+      this.callbackArg = callbackArg;
+    }
+    
+    @Override
+    protected void initMessagePart() {
+      int size = keys.size();
+      int flags = 0;
+      if (region.getDataPolicy() == DataPolicy.EMPTY) {
+        flags |= FLAG_EMPTY;
+      }
+      if (region.getConcurrencyChecksEnabled()) {
+        flags |= FLAG_CONCURRENCY_CHECKS;
+      }
+      getMessage().addIntPart(flags);
+      getMessage().addObjPart(this.callbackArg);
+      getMessage().addIntPart(size);
+
+      for (Object key: this.keys) {
+        getMessage().addStringOrObjPart(key);
+      }      
+    }
+    @Override  
+    protected Message createResponseMessage() {
+      return new ChunkedMessage(2, Version.CURRENT);
+    }
+    
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    protected Object processResponse(final Message msg, final Connection con) throws Exception {
+      final VersionedObjectList result = new VersionedObjectList();
+      final Exception[] exceptionRef = new Exception[1];
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      try {
+        processChunkedResponse((ChunkedMessage)msg,
+                             "removeAll",
+                             new ChunkHandler() {
+                               public void handle(ChunkedMessage cm) throws Exception {
+                                 int numParts = msg.getNumberOfParts();
+                                 if (isDebugEnabled) {
+                                   logger.debug("RemoveAllOp.processChunkedResponse processing message with {} parts", numParts);
+                                 }
+                                 for (int partNo=0; partNo < numParts; partNo++) {
+                                   Part part = cm.getPart(partNo);
+                                   try {
+                                     Object o = part.getObject();
+                                     if (isDebugEnabled) {
+                                       logger.debug("part({}) contained {}", partNo, o);
+                                     }
+                                     if (o == null) {
+                                       // no response is an okay response
+                                     } else if (o instanceof byte[]) {
+                                       if (prSingleHopEnabled) {
+                                         byte[] bytesReceived = part.getSerializedForm();
+                                         if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop
+                                           if (region != null) {
+                                             ClientMetadataService cms;
+                                             try {
+                                               cms = region.getCache().getClientMetadataService();
+                                               cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+                                             }
+                                             catch (CacheClosedException e) {
+                                             }
+                                           }
+                                         }
+                                       }
+                                     } else if (o instanceof Throwable) {
+                                       String s = "While performing a remote removeAll";
+                                       exceptionRef[0] = new ServerOperationException(s, (Throwable)o);
+                                     } else {
+                                       VersionedObjectList chunk = (VersionedObjectList)o;
+                                       chunk.replaceNullIDs(con.getEndpoint().getMemberId());
+                                       result.addAll(chunk);
+                                     }
+                                   } catch(Exception e) {
+                                     exceptionRef[0] = new ServerOperationException("Unable to deserialize value" , e);
+                                   }
+                                 }
+                               }
+                             });
+      } catch (ServerOperationException e) {
+        if (e.getCause() instanceof PutAllPartialResultException) {
+          PutAllPartialResultException cause = (PutAllPartialResultException)e.getCause(); 
+          cause.getSucceededKeysAndVersions().replaceNullIDs(con.getEndpoint().getMemberId());
+          throw cause;
+        } else {
+          throw e;
+        }
+      }
+      if (exceptionRef[0] != null) {
+        throw exceptionRef[0];
+      } else {
+        // v7.0.1: fill in the keys
+        if (result.hasVersions() && result.getKeys().isEmpty()) {
+          if (logger.isTraceEnabled()) {
+            logger.trace("setting keys of response to {}", this.keys);
+          }
+          ArrayList<Object> tmpKeys;
+          if (this.keys instanceof ArrayList) {
+            tmpKeys = (ArrayList<Object>) this.keys;
+          } else {
+            tmpKeys = new ArrayList<Object>(this.keys);
+          }
+          result.setKeys(tmpKeys);
+        }
+      }
+      return result;
+    }
+    
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.PUT_DATA_ERROR;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startRemoveAll();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endRemoveAllSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endRemoveAll(start, hasTimedOut(), hasFailed());
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
new file mode 100644
index 0000000..35c4494
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RollbackOp.java
@@ -0,0 +1,90 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a Rollback on the server
+ * @since 6.6
+ * @author sbawaska
+ */
+public class RollbackOp {
+
+  /**
+   * Does a rollback on the server for given transaction
+   * @param pool the pool to use to communicate with the server.
+   * @param txId the id of the transaction to rollback
+   */
+  public static void execute(ExecutablePool pool, int txId) {
+    RollbackOpImpl op = new RollbackOpImpl(txId);
+    pool.execute(op);
+  }
+  
+  private RollbackOp() {
+    // no instance allowed
+  }
+  
+  private static class RollbackOpImpl extends AbstractOp {
+    private int txId;
+
+    protected RollbackOpImpl(int txId) {
+      super(MessageType.ROLLBACK, 1);
+      getMessage().setTransactionId(txId);
+      this.txId = txId;
+    }
+    
+    @Override
+    public String toString() {
+      return "Rollback(txId="+this.txId+")";
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "rollback");
+      return null;
+    }
+
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXCEPTION;
+    }
+
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startRollback();
+    }
+
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endRollbackSend(start, hasFailed());
+    }
+
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endRollback(start, hasTimedOut(), hasFailed());
+    }
+     
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java
new file mode 100644
index 0000000..aec202d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerBlackList.java
@@ -0,0 +1,179 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * This class is designed to prevent the client from spinning
+ * and reconnected to the same failed server over and over.
+ * We've removed the old dead server monitor code because
+ * the locator is supposed to keep track of what servers are
+ * alive or dead. However, there is still the possibility that the locator
+ * may tell us a server is alive but we are unable to reach it. 
+ * 
+ * This class keeps track of the number of consecutive failures
+ * that happen to on each server. If the number of failures exceeds the limit,
+ * the server is added to a blacklist for a certain period of time. After
+ * the time is expired, the server comes off the blacklist, but the next
+ * failure will put the server back on the list for a longer period of time.
+ * 
+ * @author dsmith
+ *
+ */
+public class ServerBlackList {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  private final Map/*<ServerLocation, AI>*/ failureTrackerMap = new HashMap();
+  protected final Set blacklist = new CopyOnWriteArraySet();
+  private final Set unmodifiableBlacklist = Collections.unmodifiableSet(blacklist);
+  protected ScheduledExecutorService background;
+  protected final ListenerBroadcaster broadcaster = new ListenerBroadcaster();
+  
+  //not final for tests.
+  static int THRESHOLD = Integer.getInteger("gemfire.ServerBlackList.THRESHOLD", 3).intValue();
+  protected final long pingInterval;
+  
+  public ServerBlackList(long pingInterval) {
+    this.pingInterval = pingInterval;
+  }
+  
+  public void start(ScheduledExecutorService background) {
+    this.background = background;
+  }
+  
+  FailureTracker getFailureTracker(ServerLocation location) {
+    FailureTracker failureTracker;
+    synchronized(failureTrackerMap) {
+      failureTracker = (FailureTracker) failureTrackerMap.get(location);
+      if(failureTracker == null) {
+        failureTracker = new FailureTracker(location);
+        failureTrackerMap.put(location, failureTracker);
+      }
+    }
+    
+    return failureTracker;
+  }
+  
+  public Set getBadServers() {
+    return unmodifiableBlacklist;
+  }
+  
+  public class FailureTracker {
+    private final AtomicInteger consecutiveFailures = new AtomicInteger();
+    private final ServerLocation location;
+    
+    public FailureTracker(ServerLocation location) {
+      this.location = location;
+    }
+    
+    public void reset() {
+      consecutiveFailures.set(0);
+    }
+    
+    public void addFailure() {
+      if(blacklist.contains(location)) {
+        //A second failure must have happened before we added
+        //this server to the blacklist. Don't count that failure.
+        return;
+      }
+      long failures = consecutiveFailures.incrementAndGet();
+      if(failures >= THRESHOLD) {
+        if(logger.isDebugEnabled()) {
+          logger.debug("Blacklisting server {} for {}ms because it had {} consecutive failures", location, pingInterval, failures);
+        }
+        blacklist.add(location);
+        broadcaster.serverAdded(location);
+        try {
+          background.schedule(new ExpireBlackListTask(location), pingInterval, TimeUnit.MILLISECONDS);
+        } catch(RejectedExecutionException e) {
+          //ignore, the timer has been cancelled, which means we're shutting down.
+        }
+        
+      }
+    }
+  }
+  
+  public void addListener(BlackListListener blackListListener) {
+    broadcaster.listeners.add(blackListListener);
+  }
+  
+  public void removeListener(BlackListListener blackListListener) {
+    broadcaster.listeners.remove(blackListListener);
+  }
+  
+
+  
+  private class ExpireBlackListTask extends PoolTask {
+    private ServerLocation location;
+
+    public ExpireBlackListTask(ServerLocation location) {
+      this.location = location;
+    }
+    
+    @Override
+    public void run2() {
+      if(logger.isDebugEnabled()) {
+        logger.debug("{} is no longer blacklisted", location);
+      }
+      blacklist.remove(location);
+      broadcaster.serverRemoved(location);
+    }
+  }
+  
+  public static interface BlackListListener { 
+    public void serverAdded(ServerLocation location);
+    
+    public void serverRemoved(ServerLocation location);
+  }
+  
+  public static class BlackListListenerAdapter implements BlackListListener {
+    public void serverAdded(ServerLocation location) {
+      //do nothing
+    }
+
+    public void serverRemoved(ServerLocation location) {
+      //do nothing      
+    }
+  }
+  
+  protected static class ListenerBroadcaster implements BlackListListener {
+    
+    protected Set listeners = new CopyOnWriteArraySet();
+
+    public void serverAdded(ServerLocation location) {
+      for(Iterator itr = listeners.iterator(); itr.hasNext(); ) {
+        BlackListListener listener = (BlackListListener) itr.next();
+        listener.serverAdded(location);
+      }
+    }
+
+    public void serverRemoved(ServerLocation location) {
+      for(Iterator itr = listeners.iterator(); itr.hasNext(); ) {
+        BlackListListener listener = (BlackListListener) itr.next();
+        listener.serverRemoved(location);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java
new file mode 100644
index 0000000..a428e01
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerProxy.java
@@ -0,0 +1,60 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Used to send operations from a client to a server.
+ * @author darrel
+ * @since 5.7
+ */
+public class ServerProxy {
+  protected final InternalPool pool;
+  /**
+   * Creates a server proxy for the given pool.
+   * @param pool the pool that this proxy will use to communicate with servers
+   */
+  public ServerProxy(InternalPool pool) {
+    this.pool = pool;
+  }
+  /**
+   * Returns the pool the proxy is using.
+   */
+  public InternalPool getPool() {
+    return this.pool;
+  }
+  /**
+   * Release use of this pool
+   */
+  public void detach() {
+    this.pool.detach();
+  }
+
+  /**
+   * Ping the specified server to see if it is still alive
+   * @param server the server to do the execution on
+   */
+  public void ping(ServerLocation server) {
+    PingOp.execute(this.pool, server);
+  }
+  /**
+   * Does a query on a server
+   * @param queryPredicate A query language boolean query predicate
+   * @return  A <code>SelectResults</code> containing the values
+   *            that match the <code>queryPredicate</code>.
+   */
+  public SelectResults query(String queryPredicate, Object[] queryParams)
+  {
+    return QueryOp.execute(this.pool, queryPredicate, queryParams);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java
new file mode 100644
index 0000000..0787a03
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ServerRegionDataAccess.java
@@ -0,0 +1,134 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation;
+
+public interface ServerRegionDataAccess {
+
+  /**
+   * Does a get on the server
+   * @param key the entry key to do the get on
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   * @param clientEvent the client event, if any, for version propagation
+   * @return the entry value found by the get if any
+   */
+  public abstract Object get(Object key, Object callbackArg, EntryEventImpl clientEvent);
+
+  /**
+   * Does a region put on the server
+   * @param key the entry key to do the put on
+   * @param value the entry value to put
+   * @param clientEvent the client event, if any, for eventID and version tag propagation
+   * @param op the operation type of this event
+   * @param requireOldValue
+   * @param expectedOldValue
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   * @param isCreateFwd
+   */
+  public abstract Object put(Object key, Object value, byte[] deltaBytes, EntryEventImpl clientEvent,
+      Operation op, boolean requireOldValue, Object expectedOldValue,
+      Object callbackArg, boolean isCreateFwd);
+
+
+  /**
+   * Does a region entry destroy on the server
+   * @param key the entry key to do the destroy on
+   * @param expectedOldValue the value that must be associated with the entry, or null
+   * @param operation the operation being performed (Operation.DESTROY, Operation.REMOVE)
+   * @param clientEvent the client event, if any, for version propagation
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public abstract Object destroy(Object key, Object expectedOldValue,
+      Operation operation, EntryEventImpl clientEvent, Object callbackArg);
+
+  
+  /**
+   * Does a region entry invalidate on the server
+   * @param event the entryEventImpl that represents the invalidate
+   */
+  public abstract void invalidate(EntryEventImpl event);
+  
+  
+  /**
+   * Does a region clear on the server
+   * @param eventId the event id for this clear
+   * @param callbackArg an optional callback arg to pass to any cache callbacks
+   */
+  public abstract void clear(EventID eventId, Object callbackArg);
+
+
+  /**
+   * Does a region containsKey on a server
+   * @param key the entry key to do the containsKey on
+   */
+  public abstract boolean containsKey(Object key);
+
+  /**
+   * Does a region containsKey on a server
+   * @param key the entry key to do the containsKey on
+   */
+  public abstract boolean containsValueForKey(Object key);
+  
+  
+  /**
+   * Does a region containsValue on a server
+   * @param value the entry value to search for
+   */
+  public boolean containsValue(Object value);
+
+  
+  /**
+   * Does a region keySet on a server
+   */
+  public abstract Set keySet();
+
+  public abstract VersionedObjectList putAll(Map map, EventID eventId, boolean skipCallbacks, Object callbackArg);
+
+  public abstract VersionedObjectList removeAll(Collection<Object> keys, EventID eventId, Object callbackArg);
+
+  public abstract VersionedObjectList getAll(List keys, Object callback);
+  
+  public int size();
+
+  /**
+   * gets an entry from the server, does not invoke loaders
+   * @param key
+   * @return an {@link EntrySnapshot} for the given key
+   */
+  public Entry getEntry(Object key);
+//  public boolean containsValue(Object value);
+//  public Set entries(boolean recursive) {
+//  public void invalidate(Object key) throws TimeoutException,
+//  public int size()
+
+  /**
+   * returns the name of the region to which this interface provides access
+   */
+  public String getRegionName();
+  
+  /**
+   * returns the region to which this interface provides access.  This may be
+   * null in an admin system
+   */
+  public Region getRegion();
+
+
+}
\ No newline at end of file