You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:20 UTC

[15/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
new file mode 100755
index 0000000..f3c17b5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
@@ -0,0 +1,835 @@
+/*
+ * ========================================================================= 
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. 
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.FixedPartitionResolver;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionResolver;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.EntryOperationImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Maintains {@link ClientPartitionAdvisor} for Partitioned Regions on servers
+ * Client operations will consult this service to identify the server locations
+ * on which the data for the client operation is residing
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * 
+ * @since 6.5
+ * 
+ */
+public final class ClientMetadataService {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private final Cache cache;
+  
+  private final Set<String> nonPRs = new HashSet<String>();
+
+  private boolean HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP = Boolean.getBoolean("gemfire.PoolImpl.honourServerGroupsInPRSingleHop");
+
+  public static final int SIZE_BYTES_ARRAY_RECEIVED = 2;
+  
+  public static final int INITIAL_VERSION = 0;
+  
+  /** random number generator used in pruning */
+  private final Random rand = new Random();
+  
+  public ClientMetadataService(Cache cache) {
+    this.cache = cache;
+  }
+
+  private final Map<String, ClientPartitionAdvisor> clientPRAdvisors = new ConcurrentHashMap<String, ClientPartitionAdvisor>();
+  private final Map<String, Set<ClientPartitionAdvisor>> colocatedPRAdvisors = new ConcurrentHashMap<String, Set<ClientPartitionAdvisor>>();
+  
+  private PartitionResolver getResolver(Region r, Object key,
+      Object callbackArgument) {
+    // First choice is one associated with the region
+    final String regionFullPath = r.getFullPath();
+    ClientPartitionAdvisor advisor = this
+        .getClientPartitionAdvisor(regionFullPath);
+    PartitionResolver result = null;
+    if (advisor != null) {
+      result = advisor.getPartitionResolver();
+    }
+    
+    if (result != null) {
+      return result;
+    }
+
+    // Second is the key
+    if (key != null && key instanceof PartitionResolver) {
+      return (PartitionResolver)key;
+    }
+
+    // Third is the callback argument
+    if (callbackArgument != null
+        && callbackArgument instanceof PartitionResolver) {
+      return (PartitionResolver)callbackArgument;
+    }
+    // There is no resolver.
+    return null;
+  }
+
+  public ServerLocation getBucketServerLocation(Region region,
+      Operation operation, Object key, Object value, Object callbackArg) {
+    ClientPartitionAdvisor prAdvisor  = this.getClientPartitionAdvisor(region.getFullPath());
+    if (prAdvisor == null) {
+      return null;
+    }
+    int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+
+    final PartitionResolver resolver = getResolver(region, key, callbackArg);
+    Object resolveKey;
+    EntryOperation entryOp = null;
+    if (resolver == null) {
+      // client has not registered PartitionResolver
+      // Assuming even PR at server side is not using PartitionResolver
+      resolveKey = key;
+    }
+    else {
+      entryOp = new EntryOperationImpl(region, operation, key,
+          value, callbackArg);
+      resolveKey = resolver.getRoutingObject(entryOp);
+      if (resolveKey == null) {
+        throw new IllegalStateException(
+            LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
+                .toLocalizedString());
+      }
+    }
+    int bucketId;
+    if (resolver instanceof FixedPartitionResolver) {
+      if (entryOp == null) {
+        entryOp = new EntryOperationImpl(region,
+            Operation.FUNCTION_EXECUTION, key, null, null);
+      }
+      String partition = ((FixedPartitionResolver)resolver).getPartitionName(
+          entryOp, prAdvisor.getFixedPartitionNames());
+      if (partition == null) {
+        Object[] prms = new Object[] { region.getName(), resolver };
+        throw new IllegalStateException(
+            LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
+                .toLocalizedString(prms));
+      }
+      else {
+        bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+        if (bucketId == -1) {
+          // scheduleGetPRMetaData((LocalRegion)region);
+          return null;
+        }
+
+      }
+    }else {
+      bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
+    }
+    
+    ServerLocation bucketServerLocation = getServerLocation(region, operation,
+        bucketId);
+    ServerLocation location = null;
+    if (bucketServerLocation != null)
+      location = new ServerLocation(bucketServerLocation.getHostName(),
+          bucketServerLocation.getPort());
+    return location;
+  }
+
+  private ServerLocation getServerLocation(Region region, Operation operation,
+      int bucketId) {
+    final String regionFullPath = region.getFullPath();
+    ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+    if (prAdvisor == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath);
+      }
+      return null;
+    }
+    
+//    if (prAdvisor.getColocatedWith() != null) {
+//      prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith());
+//      if (prAdvisor == null) {
+//        if (this.logger.fineEnabled()) {
+//          this.logger.fine(
+//              "ClientMetadataService#getServerLocation : Region "
+//                  + regionFullPath + "prAdvisor does not exist.");
+//        }
+//        return null;
+//      }
+//    }
+    
+    if (operation.isGet()) {
+      return prAdvisor.adviseServerLocation(bucketId);
+    }
+    else {
+      return prAdvisor.advisePrimaryServerLocation(bucketId);
+    }
+  }
+
+  public Map<ServerLocation, HashSet> getServerToFilterMap(
+	      final Collection routingKeys, final Region region, boolean primaryMembersNeeded
+	     ) {
+	  return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false);
+  }
+  
+  public Map<ServerLocation, HashSet> getServerToFilterMap(
+      final Collection routingKeys, final Region region, boolean primaryMembersNeeded,
+      boolean bucketsAsFilter) {
+    final String regionFullPath = region.getFullPath();
+    ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+    if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
+      scheduleGetPRMetaData((LocalRegion)region, false);
+      return null;
+    }
+    HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide(
+        region, prAdvisor, routingKeys, bucketsAsFilter);
+
+    HashMap<ServerLocation, HashSet> serverToKeysMap = new HashMap<ServerLocation, HashSet>();
+    HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets(
+        prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded);
+    
+    if(serverToBuckets == null){
+      return null;
+    }
+    
+    for (Map.Entry entry : serverToBuckets.entrySet()) {
+      ServerLocation server = (ServerLocation)entry.getKey();
+      HashSet<Integer> buckets = (HashSet)entry.getValue();
+      for (Integer bucket : buckets) {
+        // use LinkedHashSet to maintain the order of keys
+        // the keys will be iterated several times
+        LinkedHashSet keys = (LinkedHashSet)serverToKeysMap.get(server);
+        if (keys == null) {
+          keys = new LinkedHashSet();
+        }
+        keys.addAll(bucketToKeysMap.get(bucket));
+        serverToKeysMap.put(server, keys);
+      }
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Returning server to keys map : {}", serverToKeysMap);
+    }
+
+    return serverToKeysMap;
+  }
+  
+  public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly){
+    final String regionFullPath = region.getFullPath();
+    ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+    if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
+      scheduleGetPRMetaData((LocalRegion)region, false);
+      return null;
+    }
+    int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+    HashSet<Integer> allBucketIds = new HashSet<Integer>();
+    for(int i =0; i < totalNumberOfBuckets; i++){
+      allBucketIds.add(i);
+    }
+    return groupByServerToBuckets(prAdvisor, allBucketIds, primaryOnly);
+  }
+  /**
+   * This function should make a map of server to buckets it is hosting.
+   * If for some bucket servers are not available due to mismatch in metadata
+   * it should fill up a random server for it.
+   */
+  private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets(
+      ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet,
+      boolean primaryOnly) {
+    if (primaryOnly) {
+      Set<Integer> bucketsWithoutServer = new HashSet<Integer>();
+      HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
+      for (Integer bucketId : bucketSet) {
+        ServerLocation server = prAdvisor.advisePrimaryServerLocation(bucketId);
+        if (server == null) {
+          bucketsWithoutServer.add(bucketId);          
+          continue;
+        }
+        HashSet<Integer> buckets = serverToBucketsMap.get(server);
+        if (buckets == null) {
+          buckets = new HashSet<Integer>(); // faster if this was an ArrayList
+          serverToBucketsMap.put(server, buckets);
+        }
+        buckets.add(bucketId);
+      }
+
+      if (!serverToBucketsMap.isEmpty() ) {
+        serverToBucketsMap.entrySet().iterator().next().getValue().addAll(
+            bucketsWithoutServer);
+      }
+      
+      if (logger.isDebugEnabled()) {
+        logger.debug("ClientMetadataService: The server to bucket map is : {}", serverToBucketsMap);
+      }
+
+      return serverToBucketsMap;
+    }
+    else {
+      return pruneNodes(prAdvisor, bucketSet);
+    }
+  }
+  
+  
+  private HashMap<ServerLocation, HashSet<Integer>> pruneNodes(
+      ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) {
+    
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("ClientMetadataService: The buckets to be pruned are: {}", buckets);
+    }
+    Set<Integer> bucketSetWithoutServer =  new HashSet<Integer>();
+    HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
+    HashMap<ServerLocation, HashSet<Integer>> prunedServerToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
+
+    for (Integer bucketId : buckets) {
+      List<BucketServerLocation66> serversList = prAdvisor
+          .adviseServerLocations(bucketId);
+      if (isDebugEnabled) {
+        logger.debug("ClientMetadataService: For bucketId {} the server list is {}", bucketId, serversList);
+      }
+      if ((serversList == null || serversList.size() == 0) ) {       
+        bucketSetWithoutServer.add(bucketId);        
+        continue;
+      }
+      
+      if (isDebugEnabled) {
+        logger.debug("ClientMetadataService: The buckets owners of the bucket: {} are: {}", bucketId, serversList);
+      }
+      
+      for (ServerLocation server : serversList) {
+        if (serverToBucketsMap.get(server) == null) {
+          HashSet<Integer> bucketSet = new HashSet<Integer>();
+          bucketSet.add(bucketId);
+          serverToBucketsMap.put(server, bucketSet);
+        }
+        else {
+          HashSet<Integer> bucketSet = serverToBucketsMap.get(server);
+          bucketSet.add(bucketId);
+          serverToBucketsMap.put(server, bucketSet);
+        }
+      }
+    }
+    if (isDebugEnabled) {
+      logger.debug("ClientMetadataService: The server to buckets map is : {}", serverToBucketsMap);
+    }
+
+    HashSet<Integer> currentBucketSet = new HashSet<Integer>();
+    // ServerLocation randomFirstServer =
+    // prAdvisor.adviseRandomServerLocation(); // get a random server here
+    ServerLocation randomFirstServer = null;
+    if (serverToBucketsMap.isEmpty()) {
+      return null;
+    }
+    else {
+      int size = serverToBucketsMap.size();
+      randomFirstServer = (ServerLocation)serverToBucketsMap.keySet().toArray()[rand.nextInt(size)];
+    }
+    HashSet<Integer> bucketSet = serverToBucketsMap.get(randomFirstServer);
+    if (isDebugEnabled) {
+      logger.debug("ClientMetadataService: Adding the server : {} which is random and buckets {} to prunedMap", randomFirstServer, bucketSet);
+    }
+    currentBucketSet.addAll(bucketSet);
+    prunedServerToBucketsMap.put(randomFirstServer, bucketSet);
+    serverToBucketsMap.remove(randomFirstServer);
+
+    while (!currentBucketSet.equals(buckets)) {
+      ServerLocation server = findNextServer(serverToBucketsMap.entrySet(),
+          currentBucketSet);
+      if (server == null) {
+//        HashSet<Integer> rBuckets = prunedServerToBucketsMap
+//            .get(randomFirstServer);
+//        HashSet<Integer> remainingBuckets = new HashSet<Integer>(buckets);
+//        remainingBuckets.removeAll(currentBucketSet);
+//        rBuckets.addAll(remainingBuckets);
+//        prunedServerToBucketsMap.put(randomFirstServer, rBuckets);
+        break;
+      }
+      
+      HashSet<Integer> bucketSet2 = serverToBucketsMap.get(server);
+      bucketSet2.removeAll(currentBucketSet);
+      if(bucketSet2.isEmpty()) {
+        serverToBucketsMap.remove(server);
+        continue;
+      }
+      currentBucketSet.addAll(bucketSet2);
+      prunedServerToBucketsMap.put(server, bucketSet2);
+      if (isDebugEnabled) {
+        logger.debug("ClientMetadataService: Adding the server : {} and buckets {} to prunedServer.", server, bucketSet2);
+      }
+      serverToBucketsMap.remove(server);
+    }
+    prunedServerToBucketsMap.entrySet().iterator().next().getValue().addAll(
+        bucketSetWithoutServer);
+    
+    
+    if (isDebugEnabled) {
+      logger.debug("ClientMetadataService: The final prunedServerToBucket calculated is : {}", prunedServerToBucketsMap);
+    }
+    
+    return prunedServerToBucketsMap;
+  }
+  
+  
+  private ServerLocation findNextServer(
+      Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet,
+      HashSet<Integer> currentBucketSet) {
+   
+    ServerLocation server = null;
+    int max = -1;
+    ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>(); 
+    for (Map.Entry<ServerLocation, HashSet<Integer>> entry : entrySet) {
+      HashSet<Integer> buckets = new HashSet<Integer>();
+      buckets.addAll(entry.getValue());
+      buckets.removeAll(currentBucketSet);
+
+      if (max < buckets.size()) {
+        max = buckets.size();
+        server = entry.getKey();
+        nodesOfEqualSize.clear();
+        nodesOfEqualSize.add(server);
+      }
+      else if (max == buckets.size()){
+        nodesOfEqualSize.add(server);
+      }
+    }
+    
+    //return node;
+    Random r = new Random();
+    if(nodesOfEqualSize.size() > 0)
+      return nodesOfEqualSize.get(r.nextInt(nodesOfEqualSize.size()));
+    
+    return null; 
+  }
+  
+  private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region,
+      ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) {
+    
+    HashMap<Integer, HashSet> bucketToKeysMap = new HashMap();
+    int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+    Iterator i = routingKeys.iterator();
+    while (i.hasNext()) {
+      Object key = i.next();      
+      int bucketId = bucketsAsFilter ? ((Integer)key).intValue() :
+        extractBucketID(region, prAdvisor, totalNumberOfBuckets, key);
+      HashSet bucketKeys = bucketToKeysMap.get(bucketId);
+      if (bucketKeys == null) {
+        bucketKeys = new HashSet(); // faster if this was an ArrayList
+        bucketToKeysMap.put(bucketId, bucketKeys);
+      }
+      bucketKeys.add(key);
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Bucket to keys map : {}", bucketToKeysMap);
+    }
+    return bucketToKeysMap;
+  }
+
+  private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor,
+      int totalNumberOfBuckets, Object key) {
+    int bucketId = -1;
+    final PartitionResolver resolver = getResolver(region, key, null);
+    Object resolveKey;
+    EntryOperation entryOp = null;
+    if (resolver == null) {
+      // client has not registered PartitionResolver
+      // Assuming even PR at server side is not using PartitionResolver
+      resolveKey = key;
+    }
+    else {
+      entryOp = new EntryOperationImpl(region,
+          Operation.FUNCTION_EXECUTION, key, null, null);
+      resolveKey = resolver.getRoutingObject(entryOp);
+      if (resolveKey == null) {
+        throw new IllegalStateException(
+            LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
+                .toLocalizedString());
+      }
+    }
+   
+    if (resolver instanceof FixedPartitionResolver) {
+      if (entryOp == null) {
+        entryOp = new EntryOperationImpl(region,
+            Operation.FUNCTION_EXECUTION, key, null, null);
+      }
+      String partition = ((FixedPartitionResolver)resolver).getPartitionName(
+          entryOp, prAdvisor.getFixedPartitionNames());
+      if (partition == null) {
+        Object[] prms = new Object[] { region.getName(), resolver };
+        throw new IllegalStateException(
+            LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
+                .toLocalizedString(prms));
+      }
+      else {
+        bucketId =  prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+        // This bucketid can be -1 in some circumstances where we don't have information about 
+        // all the partition on the server.
+        // Do proactive scheduling of metadata fetch
+        if(bucketId == -1) {
+          scheduleGetPRMetaData((LocalRegion)region, true);
+        }
+      }
+    }else{
+      bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
+    }
+    return bucketId;
+  }
+  
+  
+
+  public void scheduleGetPRMetaData(final LocalRegion region,
+      final boolean isRecursive) {
+    if(this.nonPRs.contains(region.getFullPath())){
+      return;
+    }
+    region.getCachePerfStats().incNonSingleHopsCount();
+    if (isRecursive) {
+      try {
+        getClientPRMetadata(region);
+      }
+      catch (VirtualMachineError e) {
+        SystemFailure.initiateFailure(e);
+        throw e;
+      }
+      catch (Throwable e) {
+        SystemFailure.checkFailure();
+        if (logger.isDebugEnabled()) {
+          logger.debug("An exception occurred while fetching metadata", e);
+        }
+      }
+    }
+    else {
+      Runnable fetchTask = new Runnable() {
+        @SuppressWarnings("synthetic-access")
+        public void run() {
+          try {
+            getClientPRMetadata(region);
+          }
+          catch (VirtualMachineError e) {
+            SystemFailure.initiateFailure(e);
+            throw e;
+          }
+          catch (Throwable e) {
+            SystemFailure.checkFailure();
+            if (logger.isDebugEnabled()) {
+              logger.debug("An exception occurred while fetching metadata", e);
+            }
+          }
+        }
+      };
+      SingleHopClientExecutor.submitTask(fetchTask);
+    }
+  }
+  
+  public final void getClientPRMetadata(LocalRegion region) {
+    final String regionFullPath = region.getFullPath();
+    ClientPartitionAdvisor advisor = null;
+    InternalPool pool = region.getServerProxy().getPool();
+    // Acquires lock only if it is free, else a request to fetch meta data is in
+    // progress, so just return 
+    if (region.clientMetaDataLock.tryLock()) {
+      try {
+        advisor = this.getClientPartitionAdvisor(regionFullPath);
+        if (advisor==null) {
+          advisor = GetClientPartitionAttributesOp
+              .execute(pool, regionFullPath);
+          if(advisor == null){
+            this.nonPRs.add(regionFullPath);
+            return;
+          }
+          addClientPartitionAdvisor(regionFullPath, advisor);
+        }
+        else {
+          if(advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) {
+            ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp
+            .execute(pool, regionFullPath);
+            advisor.updateFixedPAMap(newAdvisor.getFixedPAMap());
+          }
+        }
+        String colocatedWith = advisor.getColocatedWith();
+        if (colocatedWith == null) {
+          isMetadataRefreshed_TEST_ONLY = true;
+          GetClientPRMetaDataOp.execute(pool, regionFullPath, this);
+          region.getCachePerfStats().incMetaDataRefreshCount();
+        }
+        else {
+          ClientPartitionAdvisor colocatedAdvisor = this.getClientPartitionAdvisor(colocatedWith);
+          LocalRegion leaderRegion = (LocalRegion)region.getCache()
+          .getRegion(colocatedWith);
+          if (colocatedAdvisor == null) {
+            scheduleGetPRMetaData(leaderRegion, true);
+            return;
+          }
+          else {
+            isMetadataRefreshed_TEST_ONLY = true;
+            GetClientPRMetaDataOp.execute(pool, colocatedWith, this);
+            leaderRegion.getCachePerfStats().incMetaDataRefreshCount();
+          }
+        }
+      }
+      finally {
+        region.clientMetaDataLock.unlock();
+      }
+    }
+  }
+  
+  public void scheduleGetPRMetaData(final LocalRegion region,
+      final boolean isRecursive, byte nwHopType) {
+    if(this.nonPRs.contains(region.getFullPath())){
+      return;
+    }
+    ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath());
+    if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){
+      if (logger.isDebugEnabled()) {
+        logger.debug("Scheduling metadata refresh : {}", nwHopType);
+      }
+      if(nwHopType == (byte)2){
+        return;
+      }
+    }
+    region.getCachePerfStats().incNonSingleHopsCount();
+    if (isRecursive) {
+      try {
+        getClientPRMetadata(region);
+      } catch (VirtualMachineError e) {
+        SystemFailure.initiateFailure(e);
+        throw e;
+      } catch (Throwable e) {
+        SystemFailure.checkFailure();
+        if (logger.isDebugEnabled()) {
+          logger.debug("An exception occurred while fetching metadata", e);
+        }
+      }
+    } else {
+      Runnable fetchTask = new Runnable() {
+        @SuppressWarnings("synthetic-access")
+        public void run() {
+          try {
+            getClientPRMetadata(region);
+          } catch (VirtualMachineError e) {
+            SystemFailure.initiateFailure(e);
+            throw e;
+          } catch (Throwable e) {
+            SystemFailure.checkFailure();
+            if (logger.isDebugEnabled()) {
+              logger.debug("An exception occurred while fetching metadata", e);
+            }
+          }
+        }
+      };
+      SingleHopClientExecutor.submitTask(fetchTask);
+    }
+  }
+
+  public void removeBucketServerLocation(ServerLocation serverLocation) {
+    Set<String> keys = getAllRegionFullPaths();
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("ClientMetadataService removing a ServerLocation :{}{}", serverLocation, keys);
+    }
+    if (keys != null) {
+      for (String regionPath : keys) {
+        ClientPartitionAdvisor prAdvisor = this
+            .getClientPartitionAdvisor(regionPath);
+        if (isDebugEnabled) {
+          logger.debug("ClientMetadataService removing from {}{}", regionPath, prAdvisor);
+        }
+        if (prAdvisor != null) {
+          prAdvisor.removeBucketServerLocation(serverLocation);
+        }
+      }
+    }
+  }
+  
+  public byte getMetaDataVersion(Region region, Operation operation,
+      Object key, Object value, Object callbackArg) {
+    ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region
+        .getFullPath());
+    if (prAdvisor == null) {
+      return 0;
+    }
+
+    int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+
+    final PartitionResolver resolver = getResolver(region, key, callbackArg);
+    Object resolveKey;
+    EntryOperation entryOp = null;
+    if (resolver == null) {
+      // client has not registered PartitionResolver
+      // Assuming even PR at server side is not using PartitionResolver
+      resolveKey = key;
+    }
+    else {
+      entryOp = new EntryOperationImpl(region, operation, key,
+          value, callbackArg);
+      resolveKey = resolver.getRoutingObject(entryOp);
+      if (resolveKey == null) {
+        throw new IllegalStateException(
+            LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
+                .toLocalizedString());
+      }
+    }
+    
+    int bucketId;
+    if (resolver instanceof FixedPartitionResolver) {
+      if (entryOp == null) {
+        entryOp = new EntryOperationImpl(region,
+            Operation.FUNCTION_EXECUTION, key, null, null);
+      }
+      String partition = ((FixedPartitionResolver)resolver).getPartitionName(
+          entryOp, prAdvisor.getFixedPartitionNames());
+      if (partition == null) {
+        Object[] prms = new Object[] { region.getName(), resolver };
+        throw new IllegalStateException(
+            LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
+                .toLocalizedString(prms));
+      }
+      else {
+        bucketId =  prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+      }
+    }else {
+      bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
+    }
+    
+    BucketServerLocation66 bsl = (BucketServerLocation66)getPrimaryServerLocation(
+        region, bucketId);
+    if (bsl == null) {
+      return 0;
+    }
+    return bsl.getVersion();
+  }
+
+  private ServerLocation getPrimaryServerLocation(Region region, int bucketId) {
+    final String regionFullPath = region.getFullPath();
+    ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+    if (prAdvisor == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath);
+      }
+      return null;
+    }
+
+    if (prAdvisor.getColocatedWith() != null) {
+      prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith());
+      if (prAdvisor == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath);
+        }
+        return null;
+      }
+    }
+    return prAdvisor.advisePrimaryServerLocation(bucketId);
+  }
+  
+  private void addClientPartitionAdvisor(String regionFullPath,
+      ClientPartitionAdvisor advisor) {
+    if (this.cache.isClosed() || this.clientPRAdvisors == null) {
+      return;
+    }
+    try {
+      this.clientPRAdvisors.put(regionFullPath, advisor);
+      if (advisor.getColocatedWith() != null) {
+        String parentRegionPath = advisor.getColocatedWith();
+        Set<ClientPartitionAdvisor> colocatedAdvisors = this.colocatedPRAdvisors.get(parentRegionPath);
+        if(colocatedAdvisors == null){
+          colocatedAdvisors = new CopyOnWriteArraySet<ClientPartitionAdvisor>();
+          this.colocatedPRAdvisors.put(parentRegionPath, colocatedAdvisors);
+        }
+        colocatedAdvisors.add(advisor);
+      }
+    }
+    catch (Exception npe) {
+      // ignore, shutdown case
+    }
+    
+  }
+
+  public ClientPartitionAdvisor getClientPartitionAdvisor(String regionFullPath) {
+    if (this.cache.isClosed() || this.clientPRAdvisors == null) {
+      return null;
+    }
+    ClientPartitionAdvisor prAdvisor = null;
+    try {
+      prAdvisor = this.clientPRAdvisors.get(regionFullPath);
+    }
+    catch (Exception npe) {
+      return null;
+    }
+    return prAdvisor;
+  }
+  
+  public Set<ClientPartitionAdvisor> getColocatedClientPartitionAdvisor(String regionFullPath) {
+    if (this.cache.isClosed() || this.clientPRAdvisors == null || this.colocatedPRAdvisors == null) {
+      return null;
+    }
+    return this.colocatedPRAdvisors.get(regionFullPath);
+  }
+  
+  private Set<String> getAllRegionFullPaths() {
+    if (this.cache.isClosed() || this.clientPRAdvisors == null) {
+      return null;
+    }
+    Set<String> keys  = null;
+    try {
+      keys = this.clientPRAdvisors.keySet();
+    }
+    catch (Exception npe) {
+      return null;
+    }
+    return keys;
+  }
+
+  public void close() {
+    this.clientPRAdvisors.clear();
+    this.colocatedPRAdvisors.clear();
+  }
+  
+  public boolean isRefreshMetadataTestOnly() {
+    return isMetadataRefreshed_TEST_ONLY;
+  }
+
+  public void satisfyRefreshMetadata_TEST_ONLY(boolean isRefreshMetadataTestOnly) {
+    isMetadataRefreshed_TEST_ONLY = isRefreshMetadataTestOnly;
+  }
+
+  public Map<String, ClientPartitionAdvisor> getClientPRMetadata_TEST_ONLY() {
+    return clientPRAdvisors;
+  }
+
+  public Map<String, ClientPartitionAdvisor> getClientPartitionAttributesMap() {
+    return clientPRAdvisors;
+  }
+
+  public boolean honourServerGroup(){
+    return HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP;
+  }
+  
+  private boolean isMetadataRefreshed_TEST_ONLY = false;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java
new file mode 100755
index 0000000..f060b7a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java
@@ -0,0 +1,279 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.cache.FixedPartitionAttributes;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.cache.PartitionResolver;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.ClassPathLoader;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation;
+import com.gemstone.gemfire.internal.cache.FixedPartitionAttributesImpl;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Stores the information such as partition attributes and meta data details
+ * 
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ * 
+ * @since 6.5
+ * 
+ */
+public class ClientPartitionAdvisor {
+
+  private final ConcurrentMap<Integer, List<BucketServerLocation66>> bucketServerLocationsMap 
+  = new ConcurrentHashMap<Integer, List<BucketServerLocation66>>();
+
+  private final int totalNumBuckets;
+
+  private String serverGroup = "";
+  
+  private final String colocatedWith;
+
+  private PartitionResolver partitionResolver = null;
+  
+  private Map<String, List<Integer>> fixedPAMap = null;
+
+  private boolean fpaAttrsCompletes = false;
+
+  @SuppressWarnings("unchecked")
+  public ClientPartitionAdvisor(int totalNumBuckets, String colocatedWith,
+      String partitionResolverName, Set<FixedPartitionAttributes> fpaSet) {
+
+    this.totalNumBuckets = totalNumBuckets;
+    this.colocatedWith = colocatedWith;
+    try {
+      if (partitionResolverName != null) {
+        this.partitionResolver = (PartitionResolver)
+            ClassPathLoader.getLatest().forName(partitionResolverName).newInstance();
+      }
+    }
+    catch (Exception e) {
+      e.printStackTrace();
+      throw new InternalGemFireException(LocalizedStrings.ClientPartitionAdvisor_CANNOT_CREATE_AN_INSTANCE_OF_PARTITION_RESOLVER_0.toLocalizedString(partitionResolverName));
+    }
+    if (fpaSet != null) {
+      fixedPAMap = new ConcurrentHashMap<String, List<Integer>>();
+      int totalFPABuckets = 0;
+      for (FixedPartitionAttributes fpa : fpaSet) {
+        List attrList = new ArrayList();
+        totalFPABuckets+=fpa.getNumBuckets();
+        attrList.add(fpa.getNumBuckets());
+        attrList.add(((FixedPartitionAttributesImpl)fpa).getStartingBucketID());
+        fixedPAMap.put(fpa.getPartitionName(), attrList);
+      }
+      if(totalFPABuckets == this.totalNumBuckets) {
+        this.fpaAttrsCompletes = true;
+      }
+    }
+  }
+
+  public ServerLocation adviseServerLocation(int bucketId) {
+    if (this.bucketServerLocationsMap.containsKey(bucketId)) {
+      List<BucketServerLocation66> locations = this.bucketServerLocationsMap
+          .get(bucketId);
+      List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(
+          locations);
+      // TODO: We need to consider Load Balancing Policy
+      if (locationsCopy.isEmpty()) {
+        return null;
+      }
+      if (locationsCopy.size() == 1) {
+        return locationsCopy.get(0);
+      }
+      int index = new Random().nextInt(locationsCopy.size() - 1);
+      return locationsCopy.get(index);
+    }
+    return null;
+  }
+
+  public ServerLocation adviseRandomServerLocation() {
+    ArrayList<Integer> bucketList = new ArrayList<Integer>(
+        this.bucketServerLocationsMap.keySet());
+
+    if (bucketList.size() > 0) {
+      Collections.shuffle(bucketList);
+      List<BucketServerLocation66> locations = this.bucketServerLocationsMap
+          .get(bucketList.get(0));
+
+      if (locations != null) {
+        List<BucketServerLocation66> serverList = new ArrayList<BucketServerLocation66>(
+            locations);
+        if (serverList.size() == 0)
+          return null;
+        return serverList.get(0);
+      }
+    }
+    return null;
+  }
+  
+  public List<BucketServerLocation66> adviseServerLocations(int bucketId) {
+    if (this.bucketServerLocationsMap.containsKey(bucketId)) {
+      List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(
+          this.bucketServerLocationsMap.get(bucketId));
+      return locationsCopy;
+    }
+    return null;
+  }
+  
+  public ServerLocation advisePrimaryServerLocation(int bucketId) {
+    if (this.bucketServerLocationsMap.containsKey(bucketId)) {
+      List<BucketServerLocation66> locations = this.bucketServerLocationsMap
+          .get(bucketId);
+      List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(
+          locations);
+      for (BucketServerLocation66 loc : locationsCopy) {
+        if (loc.isPrimary()) {
+          return loc;
+        }
+      }
+    }
+    return null;
+  }
+
+  public void updateBucketServerLocations(int bucketId,
+    List<BucketServerLocation66> bucketServerLocations, ClientMetadataService cms) {
+    List<BucketServerLocation66> locationCopy = new ArrayList<BucketServerLocation66>();
+    List<BucketServerLocation66> locations;
+    
+    boolean honourSeverGroup = cms.honourServerGroup();
+
+    if (this.serverGroup.length() != 0 && honourSeverGroup) {
+      for (BucketServerLocation66 s : bucketServerLocations) {
+        String[] groups = s.getServerGroups();
+        if (groups.length > 0) {
+          for (String str : groups) {
+            if (str.equals(this.serverGroup)) {
+              locationCopy.add(s);
+              break;
+            }
+          }
+        } else {
+          locationCopy.add(s);
+        }
+      }
+      locations = Collections.unmodifiableList(locationCopy);
+    } else {
+      locations = Collections.unmodifiableList(bucketServerLocations);
+    }
+    
+    this.bucketServerLocationsMap.put(bucketId, locations);
+  }
+
+  public void removeBucketServerLocation(ServerLocation serverLocation) {
+    Iterator<Map.Entry<Integer, List<BucketServerLocation66>>> iter = this.bucketServerLocationsMap
+        .entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<Integer, List<BucketServerLocation66>> entry = iter.next();
+      Integer key = entry.getKey();
+      List<BucketServerLocation66> oldLocations = entry.getValue();
+      List<BucketServerLocation66> newLocations = new ArrayList<BucketServerLocation66>(
+          oldLocations);
+      // if this serverLocation contains in the list the remove the
+      // serverLocation and update the map with new List
+      while (newLocations.remove(serverLocation)
+          && !this.bucketServerLocationsMap.replace(key, oldLocations,
+              newLocations)) {
+        oldLocations = this.bucketServerLocationsMap.get(key);
+        newLocations = new ArrayList<BucketServerLocation66>(oldLocations);
+      }
+    }
+  }
+
+  public Map<Integer, List<BucketServerLocation66>> getBucketServerLocationsMap_TEST_ONLY() {
+    return this.bucketServerLocationsMap;
+  }
+
+  /**
+   * This method returns total number of buckets for a PartitionedRegion.
+   * 
+   * @return total number of buckets for a PartitionedRegion.
+   */
+  
+  public int getTotalNumBuckets() {
+    return this.totalNumBuckets;
+  }
+
+  /**
+   * @return the serverGroup
+   */
+  public String getServerGroup() {
+    return this.serverGroup;
+  }
+   
+
+  public void setServerGroup(String group) {
+    this.serverGroup = group;
+  }
+
+  /**
+   * Returns name of the colocated PartitionedRegion on CacheServer
+   */
+  public String getColocatedWith() {
+    return this.colocatedWith;
+  }
+
+  /**
+   * Returns the PartitionResolver set for custom partitioning
+   * 
+   * @return <code>PartitionResolver</code> for the PartitionedRegion
+   */
+  public PartitionResolver getPartitionResolver() {
+    return this.partitionResolver;
+  }
+
+  public Set<String> getFixedPartitionNames() {
+    return this.fixedPAMap.keySet();
+  }
+
+  public int assignFixedBucketId(Region region, String partition,
+      Object resolveKey) {
+    if (this.fixedPAMap.containsKey(partition)) {
+      List<Integer> attList = this.fixedPAMap.get(partition);
+      int hc = resolveKey.hashCode();
+      int bucketId = Math.abs(hc % (attList.get(0)));
+      int partitionBucketID = bucketId + attList.get(1);
+      return partitionBucketID;
+    }
+    else {
+      // We don't know as we might not have got the all FPAttributes
+      // from the FPR, So don't throw the exception but send the request
+      // to the server and update the FPA attributes
+      // This exception should be thrown from the server as we will
+      // not be sure of partition not available unless we contact the server.
+      return -1;
+    }
+  }
+  
+  public Map<String, List<Integer>> getFixedPAMap(){
+    return this.fixedPAMap;
+  }
+  
+  public void updateFixedPAMap(Map<String, List<Integer>> map) {
+    this.fixedPAMap.putAll(map);
+  }
+
+  public boolean isFPAAttrsComplete() {
+    return this.fpaAttrsCompletes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java
new file mode 100644
index 0000000..7526ff7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java
@@ -0,0 +1,262 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CustomExpiry;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.client.ClientRegionFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.compression.Compressor;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.UserSpecifiedRegionAttributes;
+
+/**
+ * The distributed system will always default to a loner on a client.
+ * 
+ * @author darrel
+ * @since 6.5
+ */
+
+public class ClientRegionFactoryImpl<K,V> implements ClientRegionFactory<K,V>
+{
+  private final AttributesFactory<K,V> attrsFactory;
+  private final GemFireCacheImpl cache;
+
+  /**
+   * Constructs a ClientRegionFactory by creating a DistributedSystem and a Cache. If
+   * no DistributedSystem exists it creates a loner DistributedSystem,
+   * otherwise it uses the existing DistributedSystem.
+   * A default pool will be used unless ...
+   * The Region
+   * configuration is initialized using the given region shortcut.
+   *
+   * @param pra
+   *          the region shortcut to use
+   */
+  public ClientRegionFactoryImpl(GemFireCacheImpl cache, ClientRegionShortcut pra) {
+    this.cache = cache;
+    RegionAttributes ra = cache.getRegionAttributes(pra.toString());
+    if (ra == null) {
+      throw new IllegalStateException("The region shortcut " + pra
+                                      + " has been removed.");
+    }
+    this.attrsFactory = new AttributesFactory<K,V>(ra);
+    initAttributeFactoryDefaults();
+  }
+
+  /**
+   * Constructs a ClientRegionFactory by creating a DistributedSystem and a Cache. If
+   * no DistributedSystem exists it creates a loner DistributedSystem,
+   * otherwise it uses the existing DistributedSystem.
+   * A default pool will be used unless ...
+   * The region configuration is initialized using a region attributes
+   * whose name was given as the refid.
+   *
+   * @param refid
+   *          the name of the region attributes to use
+   */
+  public ClientRegionFactoryImpl(GemFireCacheImpl cache, String refid) {
+    this.cache = cache;
+    RegionAttributes ra = cache.getRegionAttributes(refid);
+    if (ra == null) {
+      throw new IllegalStateException("The named region attributes \"" + refid
+                                      + "\" has not been defined.");
+    }
+    this.attrsFactory = new AttributesFactory<K,V>(ra);
+    initAttributeFactoryDefaults();
+  }
+
+  private void initAttributeFactoryDefaults() {
+    this.attrsFactory.setScope(Scope.LOCAL);
+    this.attrsFactory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+//    this.attrsFactory.setIgnoreJTA(true);  in 6.6 and later releases client regions support JTA
+  }
+  
+  /**
+   * Returns the cache used by this factory.
+   */
+  private GemFireCacheImpl getCache() {
+    return this.cache;
+  }
+
+  private Pool getDefaultPool() {
+    return getCache().getDefaultPool();
+  }
+
+  public ClientRegionFactory<K,V> addCacheListener(CacheListener<K,V> aListener)
+  {
+    this.attrsFactory.addCacheListener(aListener);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> initCacheListeners(CacheListener<K,V>[] newListeners)
+  {
+    this.attrsFactory.initCacheListeners(newListeners);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setEvictionAttributes(EvictionAttributes evictionAttributes) {
+    this.attrsFactory.setEvictionAttributes(evictionAttributes);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setEntryIdleTimeout(ExpirationAttributes idleTimeout)
+  {
+    this.attrsFactory.setEntryIdleTimeout(idleTimeout);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setCustomEntryIdleTimeout(CustomExpiry<K,V> custom) {
+    this.attrsFactory.setCustomEntryIdleTimeout(custom);
+    return this;
+  }
+  
+  public ClientRegionFactory<K,V> setEntryTimeToLive(ExpirationAttributes timeToLive)
+  {
+    this.attrsFactory.setEntryTimeToLive(timeToLive);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setCustomEntryTimeToLive(CustomExpiry<K,V> custom) {
+    this.attrsFactory.setCustomEntryTimeToLive(custom);
+    return this;
+  }
+  
+  public ClientRegionFactory<K,V> setRegionIdleTimeout(ExpirationAttributes idleTimeout)
+  {
+    this.attrsFactory.setRegionIdleTimeout(idleTimeout);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setRegionTimeToLive(ExpirationAttributes timeToLive)
+  {
+    this.attrsFactory.setRegionTimeToLive(timeToLive);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setKeyConstraint(Class<K> keyConstraint)
+  {
+    this.attrsFactory.setKeyConstraint(keyConstraint);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setValueConstraint(Class<V> valueConstraint)
+  {
+    this.attrsFactory.setValueConstraint(valueConstraint);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setInitialCapacity(int initialCapacity)
+  {
+    this.attrsFactory.setInitialCapacity(initialCapacity);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setLoadFactor(float loadFactor)
+  {
+    this.attrsFactory.setLoadFactor(loadFactor);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setConcurrencyLevel(int concurrencyLevel)
+  {
+    this.attrsFactory.setConcurrencyLevel(concurrencyLevel);
+    return this;
+  }
+
+  public void setConcurrencyChecksEnabled(boolean concurrencyChecksEnabled) {
+    this.attrsFactory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
+  }
+
+  public ClientRegionFactory<K,V> setDiskStoreName(String name) {
+    this.attrsFactory.setDiskStoreName(name);
+    return this;
+  }
+  
+  public ClientRegionFactory<K,V> setDiskSynchronous(boolean isSynchronous)
+  {
+    this.attrsFactory.setDiskSynchronous(isSynchronous);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setStatisticsEnabled(boolean statisticsEnabled)
+  {
+    this.attrsFactory.setStatisticsEnabled(statisticsEnabled);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setCloningEnabled(boolean cloningEnable) {
+    this.attrsFactory.setCloningEnabled(cloningEnable);
+    return this;
+  }
+
+  public ClientRegionFactory<K,V> setPoolName(String poolName) {
+    this.attrsFactory.setPoolName(poolName);
+    return this;
+  }  
+  
+  @Override
+  public ClientRegionFactory<K, V> setCompressor(Compressor compressor) {
+    this.attrsFactory.setCompressor(compressor);
+    return this;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Region<K,V> create(String name) throws RegionExistsException {
+    return getCache().basicCreateRegion(name, createRegionAttributes());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Region<K,V> createSubregion(Region<?,?> parent, String name) throws RegionExistsException {
+    return ((LocalRegion)parent).createSubregion(name, createRegionAttributes());
+  }
+  
+  @SuppressWarnings("deprecation")
+  private RegionAttributes<K,V> createRegionAttributes() {
+    RegionAttributes<K,V> ra = this.attrsFactory.create();
+    if (ra.getPoolName() == null || "".equals(ra.getPoolName())) {
+      UserSpecifiedRegionAttributes<K, V> ura = (UserSpecifiedRegionAttributes<K, V>)ra;
+      if (ura.requiresPoolName) {
+        Pool dp = getDefaultPool();
+        if (dp != null) {
+          this.attrsFactory.setPoolName(dp.getName());
+          ra = this.attrsFactory.create();
+        } else {
+          throw new IllegalStateException("The poolName must be set on a client.");
+        }
+      }
+    }
+    return ra;
+  }
+
+  //  public ClientRegionFactory<K, V> addParallelGatewaySenderId(
+//      String parallelGatewaySenderId) {
+//    this.attrsFactory.addParallelGatewaySenderId(parallelGatewaySenderId);
+//    return this;
+//  }
+//
+//  public ClientRegionFactory<K, V> addSerialGatewaySenderId(
+//      String serialGatewaySenderId) {
+//    this.attrsFactory.addSerialGatewaySenderId(serialGatewaySenderId);
+//    return this;
+//  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java
new file mode 100644
index 0000000..4333768
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java
@@ -0,0 +1,27 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+/**
+ * @author dsmith
+ *
+ */
+public interface ClientUpdater {
+
+  void close();
+  
+  boolean isAlive();
+
+  void join(long wait) throws InterruptedException;
+  
+  public void setFailedUpdater(ClientUpdater failedUpdater);
+  
+  public boolean isProcessing();
+  
+  public boolean isPrimary();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
new file mode 100644
index 0000000..269a354
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
@@ -0,0 +1,86 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tell a server that a connection is being closed
+ * @author darrel
+ * @since 5.7
+ */
+public class CloseConnectionOp {
+  /**
+   * Tell a server that a connection is being closed
+   * @param con the connection that is being closed
+   * @param keepAlive whether to keep the proxy alive on the server
+   */
+  public static void execute(Connection con, boolean keepAlive)
+    throws Exception
+  {
+    AbstractOp op = new CloseConnectionOpImpl(keepAlive);
+    con.execute(op);
+  }
+                                                               
+  private CloseConnectionOp() {
+    // no instances allowed
+  }
+  
+  private static class CloseConnectionOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public CloseConnectionOpImpl(boolean keepAlive)  {
+      super(MessageType.CLOSE_CONNECTION, 1);
+      getMessage().addRawPart(new byte[]{(byte)(keepAlive?1:0)}, false);
+    }
+    @Override  
+    protected Message createResponseMessage() {
+      // no response is sent
+      return null;
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @Override  
+    protected Object processResponse(Message msg) throws Exception {
+      throw new IllegalStateException("should never be called");
+    }
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startCloseCon();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endCloseConSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endCloseCon(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
new file mode 100644
index 0000000..0b2cc6c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
@@ -0,0 +1,100 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a commit on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class CommitOp {
+  /**
+   * Does a commit on a server using connections from the given pool
+   * to communicate with the server.
+   * @param pool the pool to use to communicate with the server.
+   */
+  public static TXCommitMessage execute(ExecutablePool pool,int txId)
+  {
+    CommitOpImpl op = new CommitOpImpl(txId);
+    pool.execute(op);
+    return op.getTXCommitMessageResponse();
+  }
+                                                               
+  private CommitOp() {
+    // no instances allowed
+  }
+  
+    
+  private static class CommitOpImpl extends AbstractOp {
+    private int txId;
+    
+    private TXCommitMessage tXCommitMessageResponse = null;
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public CommitOpImpl(int txId) {
+      super(MessageType.COMMIT, 1);
+      getMessage().setTransactionId(txId);
+      this.txId = txId;
+    }
+
+    public TXCommitMessage getTXCommitMessageResponse() {
+      return tXCommitMessageResponse;
+    }
+    
+    @Override
+    public String toString() {
+      return "TXCommit(txId="+this.txId+")";
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, "commit");
+      assert rcs != null : "TxCommit response was null";
+      this.tXCommitMessageResponse = rcs;
+      return rcs;
+    }
+     
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }    
+    
+    @Override  
+    protected boolean isErrorResponse(int msgType) {
+      return msgType == MessageType.EXCEPTION;
+    }
+    @Override  
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startCommit();
+    }
+    @Override  
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endCommitSend(start, hasFailed());
+    }
+    @Override  
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endCommit(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java
new file mode 100644
index 0000000..99bf77f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java
@@ -0,0 +1,75 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+
+/**
+ * Represents a connection from a client to a server.
+ * Instances are created, kept, and used by {@link PoolImpl}.
+ * @author darrel
+ * @since 5.7
+ */
+public interface Connection {
+  public static final long DEFAULT_CONNECTION_ID = 26739;
+  
+  public Socket getSocket();
+  public ByteBuffer getCommBuffer();
+  public ConnectionStats getStats();
+  /**
+   * Forcefully close the resources used by this connection.
+   * This should be called if the connection or the server dies.
+   */
+  public void destroy();
+
+  /**
+   * Return true if this connection has been destroyed
+   */
+  public boolean isDestroyed();
+
+  /**
+   * Gracefully close the connection by notifying 
+   * the server. It is not necessary to call destroy
+   * after closing the connection.
+   * @param keepAlive What do do this server to
+   * client connection proxy on this server. 
+   * @throws Exception if there was an error notifying the server.
+   * The connection will still be destroyed.
+   */
+  public void close(boolean keepAlive) throws Exception;
+  
+  public ServerLocation getServer();
+  
+  public Endpoint getEndpoint();
+  
+  public ServerQueueStatus getQueueStatus();
+  
+  public Object execute(Op op) throws Exception;
+
+  public void emergencyClose();
+  
+  public short getWanSiteVersion();
+   
+  public void setWanSiteVersion(short wanSiteVersion);
+  
+  public int getDistributedSystemId();
+  
+  public OutputStream getOutputStream();
+  
+  public InputStream getInputStream();
+
+  public void setConnectionID(long id);
+
+  public long getConnectionID();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java
new file mode 100644
index 0000000..390db3a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java
@@ -0,0 +1,59 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Set;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * A factory for creating new connections.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface ConnectionFactory {
+
+  /**
+   * Create a client to server connection to the given server
+   * @param location the server to connection
+   * @return a connection to that server, or null if 
+   * a connection could not be established.
+   * @throws GemFireSecurityException if there was a security exception 
+   * while trying to establish a connections.
+   */
+  Connection createClientToServerConnection(ServerLocation location, boolean forQueue)
+    throws GemFireSecurityException;
+
+  /**
+   * Returns the best server for this client to connect to.
+   * Returns null if no servers exist.
+   * @param currentServer if non-null then we are trying to replace a connection
+   *                      that we have to this server.
+   * @param excludedServers the list of servers
+   * to skip over when finding a server to connect to
+   */
+  ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers);
+
+  /**
+   * Create a client to server connection to any server
+   * that is not in the excluded list.
+   * @param excludedServers the list of servers
+   * to skip over when finding a server to connect to
+   * @return a connection or null if 
+   * a connection could not be established.
+   * @throws GemFireSecurityException if there was a security exception
+   * trying to establish a connection.
+   */
+  Connection createClientToServerConnection(Set excludedServers) throws GemFireSecurityException;
+  
+  ClientUpdater createServerToClientConnection(Endpoint endpoint, QueueManager qManager, boolean isPrimary, ClientUpdater failedUpdater);
+  
+  ServerBlackList getBlackList();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java
new file mode 100644
index 0000000..ae4b851
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java
@@ -0,0 +1,305 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.GatewayConfigurationException;
+import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.FailureTracker;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.PoolCancelledException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientUpdater;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * Creates connections, using a connection source to determine
+ * which server to connect to.
+ * @author dsmith
+ * @since 5.7
+ * 
+ */
+public class ConnectionFactoryImpl implements ConnectionFactory {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  //TODO  - the handshake holds state. It seems like the code depends 
+  //on all of the handshake operations happening in a single thread. I don't think we
+  //want that, need to refactor.
+  private final HandShake handshake;
+  private final int socketBufferSize;
+  private final int handShakeTimeout;
+  private final boolean usedByGateway;
+  private final ServerBlackList blackList;
+  private final CancelCriterion cancelCriterion;
+  private ConnectionSource source;
+  private int readTimeout;
+  private DistributedSystem ds;
+  private EndpointManager endpointManager;
+  private GatewaySender gatewaySender;
+  private PoolImpl pool;
+  
+  /**
+   * Test hook for client version support
+   * @since 5.7
+   */
+  
+  public static boolean testFailedConnectionToServer = false;
+    
+  public ConnectionFactoryImpl(ConnectionSource source,
+      EndpointManager endpointManager, DistributedSystem sys,
+      int socketBufferSize, int handShakeTimeout, int readTimeout,
+      ClientProxyMembershipID proxyId, CancelCriterion cancelCriterion,
+      boolean usedByGateway, GatewaySender sender,long pingInterval,
+      boolean multiuserSecureMode, PoolImpl pool) {
+    this.handshake = new HandShake(proxyId, sys);
+    this.handshake.setClientReadTimeout(readTimeout);
+    this.source = source;
+    this.endpointManager = endpointManager;
+    this.ds = sys;
+    this.socketBufferSize = socketBufferSize;
+    this.handShakeTimeout = handShakeTimeout;
+    this.handshake.setMultiuserSecureMode(multiuserSecureMode);
+    this.readTimeout = readTimeout;
+    this.usedByGateway = usedByGateway;
+    this.gatewaySender = sender;
+    this.blackList = new ServerBlackList(pingInterval);
+    this.cancelCriterion = cancelCriterion;
+    this.pool = pool;
+  }
+  
+  public void start(ScheduledExecutorService background) {
+    blackList.start(background);
+  }
+
+  private byte getCommMode(boolean forQueue) {
+    if (this.usedByGateway || (this.gatewaySender != null)) {
+      return Acceptor.GATEWAY_TO_GATEWAY;
+    } else if(forQueue) {
+      return Acceptor.CLIENT_TO_SERVER_FOR_QUEUE;
+    } else {
+      return Acceptor.CLIENT_TO_SERVER;
+    }
+  }
+  
+  public ServerBlackList getBlackList() { 
+    return blackList;
+  }
+  
+  public Connection createClientToServerConnection(ServerLocation location, boolean forQueue)  throws GemFireSecurityException {
+    ConnectionImpl connection = new ConnectionImpl(this.ds, this.cancelCriterion);
+    FailureTracker failureTracker = blackList.getFailureTracker(location);
+    
+    boolean initialized = false;
+    
+    try {
+      HandShake connHandShake = new HandShake(handshake);
+      connection.connect(endpointManager, location, connHandShake,
+                         socketBufferSize, handShakeTimeout, readTimeout, getCommMode(forQueue), this.gatewaySender);
+      failureTracker.reset();
+      connection.setHandShake(connHandShake);
+      authenticateIfRequired(connection);
+      initialized = true;
+    } catch(CancelException e) {
+      //propagate this up, don't retry
+      throw e;
+    } catch(GemFireSecurityException e) {
+      //propagate this up, don't retry
+      throw e;
+    } catch(GatewayConfigurationException e) {
+    //propagate this up, don't retry
+      throw e;
+    } catch(ServerRefusedConnectionException src) {
+      //propagate this up, don't retry      	
+      logger.warn(LocalizedMessage.create(LocalizedStrings.AutoConnectionSourceImpl_COULD_NOT_CREATE_A_NEW_CONNECTION_TO_SERVER_0, src.getMessage()));
+      testFailedConnectionToServer = true;
+      throw src;
+    } catch (Exception e) {
+      if (e.getMessage() != null &&
+          (e.getMessage().equals("Connection refused")
+           || e.getMessage().equals("Connection reset"))) { // this is the most common case, so don't print an exception
+        if (logger.isDebugEnabled()) {
+          logger.debug("Unable to connect to {}: connection refused", location);
+        }
+      } else {//print a warning with the exception stack trace
+        logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectException_COULD_NOT_CONNECT_TO_0, location), e);
+      }
+      testFailedConnectionToServer = true;
+    } finally {
+      if(!initialized) {
+        connection.destroy();
+        failureTracker.addFailure();
+        connection = null;
+      }
+    }
+    
+    return connection;
+  }
+
+  private void authenticateIfRequired(Connection conn) {
+    cancelCriterion.checkCancelInProgress(null);
+    if (!pool.isUsedByGateway() && !pool.getMultiuserAuthentication()) {
+      if (conn.getServer().getRequiresCredentials()) {
+        if (conn.getServer().getUserId() == -1) {
+          Long uniqueID = (Long)AuthenticateUserOp.executeOn(conn, pool);
+          conn.getServer().setUserId(uniqueID);
+          if (logger.isDebugEnabled()) {
+            logger.debug("CFI.authenticateIfRequired() Completed authentication on {}", conn);
+          }
+        }
+      }
+    }
+  }
+
+  public ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers) {
+    if (currentServer != null && source.isBalanced()) {
+      return currentServer;
+    }
+    final Set origExcludedServers = excludedServers;
+    excludedServers = new HashSet(excludedServers);
+    Set blackListedServers = blackList.getBadServers();  
+    excludedServers.addAll(blackListedServers);
+    ServerLocation server = source.findReplacementServer(currentServer, excludedServers);
+    if (server == null) {
+      // Nothing worked! Let's try without the blacklist.
+      if (excludedServers.size() > origExcludedServers.size()) {
+        // We had some guys black listed so lets give this another whirl.
+        server = source.findReplacementServer(currentServer, origExcludedServers);
+      }
+    }
+    if (server == null && logger.isDebugEnabled()) {
+      logger.debug("Source was unable to findForReplacement any servers");
+    }
+    return server;
+  }
+  
+  public Connection createClientToServerConnection(Set excludedServers) throws GemFireSecurityException {
+    final Set origExcludedServers = excludedServers;
+    excludedServers = new HashSet(excludedServers);
+    Set blackListedServers = blackList.getBadServers();  
+    excludedServers.addAll(blackListedServers);
+    Connection conn = null;
+//    long startTime = System.currentTimeMillis();
+    RuntimeException fatalException = null;
+    boolean tryBlackList = true;
+    
+    do {
+      ServerLocation server = source.findServer(excludedServers);
+      if(server == null) {
+        
+        if(tryBlackList) {
+          // Nothing worked! Let's try without the blacklist.
+          tryBlackList = false;
+          int size = excludedServers.size();
+          excludedServers.removeAll(blackListedServers);
+          // make sure we didn't remove any of the ones that the caller set not to use
+          excludedServers.addAll(origExcludedServers);
+          if(excludedServers.size()<size) {
+            // We are able to remove some exclusions, so lets give this another whirl.
+            continue;
+          }
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Source was unable to locate any servers");
+        }
+        if(fatalException!=null) {
+          throw fatalException;
+        }
+        return null;
+      }
+    
+      try {
+        conn = createClientToServerConnection(server, false);
+      } catch(CancelException e) {
+      //propagate this up immediately
+        throw e;
+      } catch(GemFireSecurityException e) {
+        //propagate this up immediately
+        throw e; 
+      } catch(GatewayConfigurationException e) {
+        //propagate this up immediately
+        throw e;
+      } catch(ServerRefusedConnectionException srce) {
+        fatalException = srce;
+        if (logger.isDebugEnabled()) {
+          logger.debug("ServerRefusedConnectionException attempting to connect to {}", server , srce);
+        }
+      } catch (Exception e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectException_COULD_NOT_CONNECT_TO_0, server), e);
+      }
+      
+      excludedServers.add(server);
+    } while(conn == null);
+      
+//    if(conn == null) {
+//      logger.fine("Unable to create a connection in the allowed time.");
+//      
+//      if(fatalException!=null) {
+//        throw fatalException;
+//      }
+//    }
+    return conn;
+  }
+  
+  public ClientUpdater createServerToClientConnection(Endpoint endpoint,
+      QueueManager qManager, boolean isPrimary, ClientUpdater failedUpdater) {
+    String clientUpdateName = CacheClientUpdater.CLIENT_UPDATER_THREAD_NAME
+    + " on " + endpoint.getMemberId() + " port " + endpoint.getLocation().getPort();
+    if (logger.isDebugEnabled()) {
+      logger.debug("Establishing: {}", clientUpdateName);
+    }
+//  Launch the thread
+    CacheClientUpdater updater = new CacheClientUpdater(clientUpdateName,
+        endpoint.getLocation(), isPrimary, ds, new HandShake(this.handshake), qManager,
+        endpointManager, endpoint, handShakeTimeout);
+    
+    if(!updater.isConnected()) {
+      return null;
+    }
+    
+    updater.setFailedUpdater(failedUpdater);
+    updater.start();
+
+//  Wait for the client update thread to be ready
+//    if (!updater.waitForInitialization()) {
+      // Yogesh : This doesn't wait for notify if the updater
+      // thread exits from the run in case of Exception in CCU thread
+      // Yogesh : fix for 36690
+      // because when CCU thread gets a ConnectException, it comes out of run method
+      // and when a thread is no more running it notifies all the waiting threads on the thread object.
+      // so above wait will come out irrelevant of notify from CCU thread, when CCU thread has got an exception
+      // To avoid this problem we check isAlive before returning from this method.
+//      if (logger != null && logger.infoEnabled()) {
+//        logger.info(LocalizedStrings.AutoConnectionSourceImpl_0_NOT_STARTED_1, new Object[] {this, clientUpdateName});
+//      }
+//      return null;
+//    }else {
+//      if (logger != null && logger.infoEnabled()) {
+//        logger.info(LocalizedStrings.AutoConnectionSourceImpl_0_STARTED_1, new Object[] {this, clientUpdateName});
+//      }
+//    }
+    return updater;
+  }
+}
+
+