You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:20 UTC
[15/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
new file mode 100755
index 0000000..f3c17b5
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
@@ -0,0 +1,835 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.FixedPartitionResolver;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionResolver;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.EntryOperationImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Maintains {@link ClientPartitionAdvisor} for Partitioned Regions on servers
+ * Client operations will consult this service to identify the server locations
+ * on which the data for the client operation is residing
+ *
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ *
+ * @since 6.5
+ *
+ */
+public final class ClientMetadataService {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private final Cache cache;
+
+ private final Set<String> nonPRs = new HashSet<String>();
+
+ private boolean HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP = Boolean.getBoolean("gemfire.PoolImpl.honourServerGroupsInPRSingleHop");
+
+ public static final int SIZE_BYTES_ARRAY_RECEIVED = 2;
+
+ public static final int INITIAL_VERSION = 0;
+
+ /** random number generator used in pruning */
+ private final Random rand = new Random();
+
+ public ClientMetadataService(Cache cache) {
+ this.cache = cache;
+ }
+
+ private final Map<String, ClientPartitionAdvisor> clientPRAdvisors = new ConcurrentHashMap<String, ClientPartitionAdvisor>();
+ private final Map<String, Set<ClientPartitionAdvisor>> colocatedPRAdvisors = new ConcurrentHashMap<String, Set<ClientPartitionAdvisor>>();
+
+ private PartitionResolver getResolver(Region r, Object key,
+ Object callbackArgument) {
+ // First choice is one associated with the region
+ final String regionFullPath = r.getFullPath();
+ ClientPartitionAdvisor advisor = this
+ .getClientPartitionAdvisor(regionFullPath);
+ PartitionResolver result = null;
+ if (advisor != null) {
+ result = advisor.getPartitionResolver();
+ }
+
+ if (result != null) {
+ return result;
+ }
+
+ // Second is the key
+ if (key != null && key instanceof PartitionResolver) {
+ return (PartitionResolver)key;
+ }
+
+ // Third is the callback argument
+ if (callbackArgument != null
+ && callbackArgument instanceof PartitionResolver) {
+ return (PartitionResolver)callbackArgument;
+ }
+ // There is no resolver.
+ return null;
+ }
+
+ public ServerLocation getBucketServerLocation(Region region,
+ Operation operation, Object key, Object value, Object callbackArg) {
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region.getFullPath());
+ if (prAdvisor == null) {
+ return null;
+ }
+ int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+
+ final PartitionResolver resolver = getResolver(region, key, callbackArg);
+ Object resolveKey;
+ EntryOperation entryOp = null;
+ if (resolver == null) {
+ // client has not registered PartitionResolver
+ // Assuming even PR at server side is not using PartitionResolver
+ resolveKey = key;
+ }
+ else {
+ entryOp = new EntryOperationImpl(region, operation, key,
+ value, callbackArg);
+ resolveKey = resolver.getRoutingObject(entryOp);
+ if (resolveKey == null) {
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
+ .toLocalizedString());
+ }
+ }
+ int bucketId;
+ if (resolver instanceof FixedPartitionResolver) {
+ if (entryOp == null) {
+ entryOp = new EntryOperationImpl(region,
+ Operation.FUNCTION_EXECUTION, key, null, null);
+ }
+ String partition = ((FixedPartitionResolver)resolver).getPartitionName(
+ entryOp, prAdvisor.getFixedPartitionNames());
+ if (partition == null) {
+ Object[] prms = new Object[] { region.getName(), resolver };
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
+ .toLocalizedString(prms));
+ }
+ else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ if (bucketId == -1) {
+ // scheduleGetPRMetaData((LocalRegion)region);
+ return null;
+ }
+
+ }
+ }else {
+ bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
+ }
+
+ ServerLocation bucketServerLocation = getServerLocation(region, operation,
+ bucketId);
+ ServerLocation location = null;
+ if (bucketServerLocation != null)
+ location = new ServerLocation(bucketServerLocation.getHostName(),
+ bucketServerLocation.getPort());
+ return location;
+ }
+
+ private ServerLocation getServerLocation(Region region, Operation operation,
+ int bucketId) {
+ final String regionFullPath = region.getFullPath();
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+ if (prAdvisor == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath);
+ }
+ return null;
+ }
+
+// if (prAdvisor.getColocatedWith() != null) {
+// prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith());
+// if (prAdvisor == null) {
+// if (this.logger.fineEnabled()) {
+// this.logger.fine(
+// "ClientMetadataService#getServerLocation : Region "
+// + regionFullPath + "prAdvisor does not exist.");
+// }
+// return null;
+// }
+// }
+
+ if (operation.isGet()) {
+ return prAdvisor.adviseServerLocation(bucketId);
+ }
+ else {
+ return prAdvisor.advisePrimaryServerLocation(bucketId);
+ }
+ }
+
+ public Map<ServerLocation, HashSet> getServerToFilterMap(
+ final Collection routingKeys, final Region region, boolean primaryMembersNeeded
+ ) {
+ return getServerToFilterMap(routingKeys, region, primaryMembersNeeded, false);
+ }
+
+ public Map<ServerLocation, HashSet> getServerToFilterMap(
+ final Collection routingKeys, final Region region, boolean primaryMembersNeeded,
+ boolean bucketsAsFilter) {
+ final String regionFullPath = region.getFullPath();
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+ if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
+ scheduleGetPRMetaData((LocalRegion)region, false);
+ return null;
+ }
+ HashMap<Integer, HashSet> bucketToKeysMap = groupByBucketOnClientSide(
+ region, prAdvisor, routingKeys, bucketsAsFilter);
+
+ HashMap<ServerLocation, HashSet> serverToKeysMap = new HashMap<ServerLocation, HashSet>();
+ HashMap<ServerLocation, HashSet<Integer>> serverToBuckets = groupByServerToBuckets(
+ prAdvisor, bucketToKeysMap.keySet(), primaryMembersNeeded);
+
+ if(serverToBuckets == null){
+ return null;
+ }
+
+ for (Map.Entry entry : serverToBuckets.entrySet()) {
+ ServerLocation server = (ServerLocation)entry.getKey();
+ HashSet<Integer> buckets = (HashSet)entry.getValue();
+ for (Integer bucket : buckets) {
+ // use LinkedHashSet to maintain the order of keys
+ // the keys will be iterated several times
+ LinkedHashSet keys = (LinkedHashSet)serverToKeysMap.get(server);
+ if (keys == null) {
+ keys = new LinkedHashSet();
+ }
+ keys.addAll(bucketToKeysMap.get(bucket));
+ serverToKeysMap.put(server, keys);
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Returning server to keys map : {}", serverToKeysMap);
+ }
+
+ return serverToKeysMap;
+ }
+
+ public HashMap<ServerLocation, HashSet<Integer>> groupByServerToAllBuckets(Region region, boolean primaryOnly){
+ final String regionFullPath = region.getFullPath();
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+ if (prAdvisor == null || prAdvisor.adviseRandomServerLocation() == null) {
+ scheduleGetPRMetaData((LocalRegion)region, false);
+ return null;
+ }
+ int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+ HashSet<Integer> allBucketIds = new HashSet<Integer>();
+ for(int i =0; i < totalNumberOfBuckets; i++){
+ allBucketIds.add(i);
+ }
+ return groupByServerToBuckets(prAdvisor, allBucketIds, primaryOnly);
+ }
+ /**
+ * This function should make a map of server to buckets it is hosting.
+ * If for some bucket servers are not available due to mismatch in metadata
+ * it should fill up a random server for it.
+ */
+ private HashMap<ServerLocation, HashSet<Integer>> groupByServerToBuckets(
+ ClientPartitionAdvisor prAdvisor, Set<Integer> bucketSet,
+ boolean primaryOnly) {
+ if (primaryOnly) {
+ Set<Integer> bucketsWithoutServer = new HashSet<Integer>();
+ HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
+ for (Integer bucketId : bucketSet) {
+ ServerLocation server = prAdvisor.advisePrimaryServerLocation(bucketId);
+ if (server == null) {
+ bucketsWithoutServer.add(bucketId);
+ continue;
+ }
+ HashSet<Integer> buckets = serverToBucketsMap.get(server);
+ if (buckets == null) {
+ buckets = new HashSet<Integer>(); // faster if this was an ArrayList
+ serverToBucketsMap.put(server, buckets);
+ }
+ buckets.add(bucketId);
+ }
+
+ if (!serverToBucketsMap.isEmpty() ) {
+ serverToBucketsMap.entrySet().iterator().next().getValue().addAll(
+ bucketsWithoutServer);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClientMetadataService: The server to bucket map is : {}", serverToBucketsMap);
+ }
+
+ return serverToBucketsMap;
+ }
+ else {
+ return pruneNodes(prAdvisor, bucketSet);
+ }
+ }
+
+
+ private HashMap<ServerLocation, HashSet<Integer>> pruneNodes(
+ ClientPartitionAdvisor prAdvisor, Set<Integer> buckets) {
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: The buckets to be pruned are: {}", buckets);
+ }
+ Set<Integer> bucketSetWithoutServer = new HashSet<Integer>();
+ HashMap<ServerLocation, HashSet<Integer>> serverToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
+ HashMap<ServerLocation, HashSet<Integer>> prunedServerToBucketsMap = new HashMap<ServerLocation, HashSet<Integer>>();
+
+ for (Integer bucketId : buckets) {
+ List<BucketServerLocation66> serversList = prAdvisor
+ .adviseServerLocations(bucketId);
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: For bucketId {} the server list is {}", bucketId, serversList);
+ }
+ if ((serversList == null || serversList.size() == 0) ) {
+ bucketSetWithoutServer.add(bucketId);
+ continue;
+ }
+
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: The buckets owners of the bucket: {} are: {}", bucketId, serversList);
+ }
+
+ for (ServerLocation server : serversList) {
+ if (serverToBucketsMap.get(server) == null) {
+ HashSet<Integer> bucketSet = new HashSet<Integer>();
+ bucketSet.add(bucketId);
+ serverToBucketsMap.put(server, bucketSet);
+ }
+ else {
+ HashSet<Integer> bucketSet = serverToBucketsMap.get(server);
+ bucketSet.add(bucketId);
+ serverToBucketsMap.put(server, bucketSet);
+ }
+ }
+ }
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: The server to buckets map is : {}", serverToBucketsMap);
+ }
+
+ HashSet<Integer> currentBucketSet = new HashSet<Integer>();
+ // ServerLocation randomFirstServer =
+ // prAdvisor.adviseRandomServerLocation(); // get a random server here
+ ServerLocation randomFirstServer = null;
+ if (serverToBucketsMap.isEmpty()) {
+ return null;
+ }
+ else {
+ int size = serverToBucketsMap.size();
+ randomFirstServer = (ServerLocation)serverToBucketsMap.keySet().toArray()[rand.nextInt(size)];
+ }
+ HashSet<Integer> bucketSet = serverToBucketsMap.get(randomFirstServer);
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: Adding the server : {} which is random and buckets {} to prunedMap", randomFirstServer, bucketSet);
+ }
+ currentBucketSet.addAll(bucketSet);
+ prunedServerToBucketsMap.put(randomFirstServer, bucketSet);
+ serverToBucketsMap.remove(randomFirstServer);
+
+ while (!currentBucketSet.equals(buckets)) {
+ ServerLocation server = findNextServer(serverToBucketsMap.entrySet(),
+ currentBucketSet);
+ if (server == null) {
+// HashSet<Integer> rBuckets = prunedServerToBucketsMap
+// .get(randomFirstServer);
+// HashSet<Integer> remainingBuckets = new HashSet<Integer>(buckets);
+// remainingBuckets.removeAll(currentBucketSet);
+// rBuckets.addAll(remainingBuckets);
+// prunedServerToBucketsMap.put(randomFirstServer, rBuckets);
+ break;
+ }
+
+ HashSet<Integer> bucketSet2 = serverToBucketsMap.get(server);
+ bucketSet2.removeAll(currentBucketSet);
+ if(bucketSet2.isEmpty()) {
+ serverToBucketsMap.remove(server);
+ continue;
+ }
+ currentBucketSet.addAll(bucketSet2);
+ prunedServerToBucketsMap.put(server, bucketSet2);
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: Adding the server : {} and buckets {} to prunedServer.", server, bucketSet2);
+ }
+ serverToBucketsMap.remove(server);
+ }
+ prunedServerToBucketsMap.entrySet().iterator().next().getValue().addAll(
+ bucketSetWithoutServer);
+
+
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService: The final prunedServerToBucket calculated is : {}", prunedServerToBucketsMap);
+ }
+
+ return prunedServerToBucketsMap;
+ }
+
+
+ private ServerLocation findNextServer(
+ Set<Map.Entry<ServerLocation, HashSet<Integer>>> entrySet,
+ HashSet<Integer> currentBucketSet) {
+
+ ServerLocation server = null;
+ int max = -1;
+ ArrayList<ServerLocation> nodesOfEqualSize = new ArrayList<ServerLocation>();
+ for (Map.Entry<ServerLocation, HashSet<Integer>> entry : entrySet) {
+ HashSet<Integer> buckets = new HashSet<Integer>();
+ buckets.addAll(entry.getValue());
+ buckets.removeAll(currentBucketSet);
+
+ if (max < buckets.size()) {
+ max = buckets.size();
+ server = entry.getKey();
+ nodesOfEqualSize.clear();
+ nodesOfEqualSize.add(server);
+ }
+ else if (max == buckets.size()){
+ nodesOfEqualSize.add(server);
+ }
+ }
+
+ //return node;
+ Random r = new Random();
+ if(nodesOfEqualSize.size() > 0)
+ return nodesOfEqualSize.get(r.nextInt(nodesOfEqualSize.size()));
+
+ return null;
+ }
+
+ private HashMap<Integer, HashSet> groupByBucketOnClientSide(Region region,
+ ClientPartitionAdvisor prAdvisor, Collection routingKeys, boolean bucketsAsFilter) {
+
+ HashMap<Integer, HashSet> bucketToKeysMap = new HashMap();
+ int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+ Iterator i = routingKeys.iterator();
+ while (i.hasNext()) {
+ Object key = i.next();
+ int bucketId = bucketsAsFilter ? ((Integer)key).intValue() :
+ extractBucketID(region, prAdvisor, totalNumberOfBuckets, key);
+ HashSet bucketKeys = bucketToKeysMap.get(bucketId);
+ if (bucketKeys == null) {
+ bucketKeys = new HashSet(); // faster if this was an ArrayList
+ bucketToKeysMap.put(bucketId, bucketKeys);
+ }
+ bucketKeys.add(key);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Bucket to keys map : {}", bucketToKeysMap);
+ }
+ return bucketToKeysMap;
+ }
+
+ private int extractBucketID(Region region, ClientPartitionAdvisor prAdvisor,
+ int totalNumberOfBuckets, Object key) {
+ int bucketId = -1;
+ final PartitionResolver resolver = getResolver(region, key, null);
+ Object resolveKey;
+ EntryOperation entryOp = null;
+ if (resolver == null) {
+ // client has not registered PartitionResolver
+ // Assuming even PR at server side is not using PartitionResolver
+ resolveKey = key;
+ }
+ else {
+ entryOp = new EntryOperationImpl(region,
+ Operation.FUNCTION_EXECUTION, key, null, null);
+ resolveKey = resolver.getRoutingObject(entryOp);
+ if (resolveKey == null) {
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
+ .toLocalizedString());
+ }
+ }
+
+ if (resolver instanceof FixedPartitionResolver) {
+ if (entryOp == null) {
+ entryOp = new EntryOperationImpl(region,
+ Operation.FUNCTION_EXECUTION, key, null, null);
+ }
+ String partition = ((FixedPartitionResolver)resolver).getPartitionName(
+ entryOp, prAdvisor.getFixedPartitionNames());
+ if (partition == null) {
+ Object[] prms = new Object[] { region.getName(), resolver };
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
+ .toLocalizedString(prms));
+ }
+ else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ // This bucketid can be -1 in some circumstances where we don't have information about
+ // all the partition on the server.
+ // Do proactive scheduling of metadata fetch
+ if(bucketId == -1) {
+ scheduleGetPRMetaData((LocalRegion)region, true);
+ }
+ }
+ }else{
+ bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
+ }
+ return bucketId;
+ }
+
+
+
+ public void scheduleGetPRMetaData(final LocalRegion region,
+ final boolean isRecursive) {
+ if(this.nonPRs.contains(region.getFullPath())){
+ return;
+ }
+ region.getCachePerfStats().incNonSingleHopsCount();
+ if (isRecursive) {
+ try {
+ getClientPRMetadata(region);
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("An exception occurred while fetching metadata", e);
+ }
+ }
+ }
+ else {
+ Runnable fetchTask = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ try {
+ getClientPRMetadata(region);
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("An exception occurred while fetching metadata", e);
+ }
+ }
+ }
+ };
+ SingleHopClientExecutor.submitTask(fetchTask);
+ }
+ }
+
+ public final void getClientPRMetadata(LocalRegion region) {
+ final String regionFullPath = region.getFullPath();
+ ClientPartitionAdvisor advisor = null;
+ InternalPool pool = region.getServerProxy().getPool();
+ // Acquires lock only if it is free, else a request to fetch meta data is in
+ // progress, so just return
+ if (region.clientMetaDataLock.tryLock()) {
+ try {
+ advisor = this.getClientPartitionAdvisor(regionFullPath);
+ if (advisor==null) {
+ advisor = GetClientPartitionAttributesOp
+ .execute(pool, regionFullPath);
+ if(advisor == null){
+ this.nonPRs.add(regionFullPath);
+ return;
+ }
+ addClientPartitionAdvisor(regionFullPath, advisor);
+ }
+ else {
+ if(advisor.getFixedPAMap() != null && !advisor.isFPAAttrsComplete()) {
+ ClientPartitionAdvisor newAdvisor = GetClientPartitionAttributesOp
+ .execute(pool, regionFullPath);
+ advisor.updateFixedPAMap(newAdvisor.getFixedPAMap());
+ }
+ }
+ String colocatedWith = advisor.getColocatedWith();
+ if (colocatedWith == null) {
+ isMetadataRefreshed_TEST_ONLY = true;
+ GetClientPRMetaDataOp.execute(pool, regionFullPath, this);
+ region.getCachePerfStats().incMetaDataRefreshCount();
+ }
+ else {
+ ClientPartitionAdvisor colocatedAdvisor = this.getClientPartitionAdvisor(colocatedWith);
+ LocalRegion leaderRegion = (LocalRegion)region.getCache()
+ .getRegion(colocatedWith);
+ if (colocatedAdvisor == null) {
+ scheduleGetPRMetaData(leaderRegion, true);
+ return;
+ }
+ else {
+ isMetadataRefreshed_TEST_ONLY = true;
+ GetClientPRMetaDataOp.execute(pool, colocatedWith, this);
+ leaderRegion.getCachePerfStats().incMetaDataRefreshCount();
+ }
+ }
+ }
+ finally {
+ region.clientMetaDataLock.unlock();
+ }
+ }
+ }
+
+ public void scheduleGetPRMetaData(final LocalRegion region,
+ final boolean isRecursive, byte nwHopType) {
+ if(this.nonPRs.contains(region.getFullPath())){
+ return;
+ }
+ ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath());
+ if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){
+ if (logger.isDebugEnabled()) {
+ logger.debug("Scheduling metadata refresh : {}", nwHopType);
+ }
+ if(nwHopType == (byte)2){
+ return;
+ }
+ }
+ region.getCachePerfStats().incNonSingleHopsCount();
+ if (isRecursive) {
+ try {
+ getClientPRMetadata(region);
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable e) {
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("An exception occurred while fetching metadata", e);
+ }
+ }
+ } else {
+ Runnable fetchTask = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ try {
+ getClientPRMetadata(region);
+ } catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ } catch (Throwable e) {
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("An exception occurred while fetching metadata", e);
+ }
+ }
+ }
+ };
+ SingleHopClientExecutor.submitTask(fetchTask);
+ }
+ }
+
+ public void removeBucketServerLocation(ServerLocation serverLocation) {
+ Set<String> keys = getAllRegionFullPaths();
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService removing a ServerLocation :{}{}", serverLocation, keys);
+ }
+ if (keys != null) {
+ for (String regionPath : keys) {
+ ClientPartitionAdvisor prAdvisor = this
+ .getClientPartitionAdvisor(regionPath);
+ if (isDebugEnabled) {
+ logger.debug("ClientMetadataService removing from {}{}", regionPath, prAdvisor);
+ }
+ if (prAdvisor != null) {
+ prAdvisor.removeBucketServerLocation(serverLocation);
+ }
+ }
+ }
+ }
+
+ public byte getMetaDataVersion(Region region, Operation operation,
+ Object key, Object value, Object callbackArg) {
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(region
+ .getFullPath());
+ if (prAdvisor == null) {
+ return 0;
+ }
+
+ int totalNumberOfBuckets = prAdvisor.getTotalNumBuckets();
+
+ final PartitionResolver resolver = getResolver(region, key, callbackArg);
+ Object resolveKey;
+ EntryOperation entryOp = null;
+ if (resolver == null) {
+ // client has not registered PartitionResolver
+ // Assuming even PR at server side is not using PartitionResolver
+ resolveKey = key;
+ }
+ else {
+ entryOp = new EntryOperationImpl(region, operation, key,
+ value, callbackArg);
+ resolveKey = resolver.getRoutingObject(entryOp);
+ if (resolveKey == null) {
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionedRegionHelper_THE_ROUTINGOBJECT_RETURNED_BY_PARTITIONRESOLVER_IS_NULL
+ .toLocalizedString());
+ }
+ }
+
+ int bucketId;
+ if (resolver instanceof FixedPartitionResolver) {
+ if (entryOp == null) {
+ entryOp = new EntryOperationImpl(region,
+ Operation.FUNCTION_EXECUTION, key, null, null);
+ }
+ String partition = ((FixedPartitionResolver)resolver).getPartitionName(
+ entryOp, prAdvisor.getFixedPartitionNames());
+ if (partition == null) {
+ Object[] prms = new Object[] { region.getName(), resolver };
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionedRegionHelper_FOR_REGION_0_PARTITIONRESOLVER_1_RETURNED_PARTITION_NAME_NULL
+ .toLocalizedString(prms));
+ }
+ else {
+ bucketId = prAdvisor.assignFixedBucketId(region, partition, resolveKey);
+ }
+ }else {
+ bucketId = PartitionedRegionHelper.getHashKey(resolveKey, totalNumberOfBuckets);
+ }
+
+ BucketServerLocation66 bsl = (BucketServerLocation66)getPrimaryServerLocation(
+ region, bucketId);
+ if (bsl == null) {
+ return 0;
+ }
+ return bsl.getVersion();
+ }
+
+ private ServerLocation getPrimaryServerLocation(Region region, int bucketId) {
+ final String regionFullPath = region.getFullPath();
+ ClientPartitionAdvisor prAdvisor = this.getClientPartitionAdvisor(regionFullPath);
+ if (prAdvisor == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath);
+ }
+ return null;
+ }
+
+ if (prAdvisor.getColocatedWith() != null) {
+ prAdvisor = this.getClientPartitionAdvisor(prAdvisor.getColocatedWith());
+ if (prAdvisor == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ClientMetadataService#getServerLocation : Region {} prAdvisor does not exist.", regionFullPath);
+ }
+ return null;
+ }
+ }
+ return prAdvisor.advisePrimaryServerLocation(bucketId);
+ }
+
+ private void addClientPartitionAdvisor(String regionFullPath,
+ ClientPartitionAdvisor advisor) {
+ if (this.cache.isClosed() || this.clientPRAdvisors == null) {
+ return;
+ }
+ try {
+ this.clientPRAdvisors.put(regionFullPath, advisor);
+ if (advisor.getColocatedWith() != null) {
+ String parentRegionPath = advisor.getColocatedWith();
+ Set<ClientPartitionAdvisor> colocatedAdvisors = this.colocatedPRAdvisors.get(parentRegionPath);
+ if(colocatedAdvisors == null){
+ colocatedAdvisors = new CopyOnWriteArraySet<ClientPartitionAdvisor>();
+ this.colocatedPRAdvisors.put(parentRegionPath, colocatedAdvisors);
+ }
+ colocatedAdvisors.add(advisor);
+ }
+ }
+ catch (Exception npe) {
+ // ignore, shutdown case
+ }
+
+ }
+
+ public ClientPartitionAdvisor getClientPartitionAdvisor(String regionFullPath) {
+ if (this.cache.isClosed() || this.clientPRAdvisors == null) {
+ return null;
+ }
+ ClientPartitionAdvisor prAdvisor = null;
+ try {
+ prAdvisor = this.clientPRAdvisors.get(regionFullPath);
+ }
+ catch (Exception npe) {
+ return null;
+ }
+ return prAdvisor;
+ }
+
+ public Set<ClientPartitionAdvisor> getColocatedClientPartitionAdvisor(String regionFullPath) {
+ if (this.cache.isClosed() || this.clientPRAdvisors == null || this.colocatedPRAdvisors == null) {
+ return null;
+ }
+ return this.colocatedPRAdvisors.get(regionFullPath);
+ }
+
+ private Set<String> getAllRegionFullPaths() {
+ if (this.cache.isClosed() || this.clientPRAdvisors == null) {
+ return null;
+ }
+ Set<String> keys = null;
+ try {
+ keys = this.clientPRAdvisors.keySet();
+ }
+ catch (Exception npe) {
+ return null;
+ }
+ return keys;
+ }
+
+ public void close() {
+ this.clientPRAdvisors.clear();
+ this.colocatedPRAdvisors.clear();
+ }
+
+ public boolean isRefreshMetadataTestOnly() {
+ return isMetadataRefreshed_TEST_ONLY;
+ }
+
+ public void satisfyRefreshMetadata_TEST_ONLY(boolean isRefreshMetadataTestOnly) {
+ isMetadataRefreshed_TEST_ONLY = isRefreshMetadataTestOnly;
+ }
+
+ public Map<String, ClientPartitionAdvisor> getClientPRMetadata_TEST_ONLY() {
+ return clientPRAdvisors;
+ }
+
+ public Map<String, ClientPartitionAdvisor> getClientPartitionAttributesMap() {
+ return clientPRAdvisors;
+ }
+
+ public boolean honourServerGroup(){
+ return HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP;
+ }
+
+ private boolean isMetadataRefreshed_TEST_ONLY = false;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java
new file mode 100755
index 0000000..f060b7a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientPartitionAdvisor.java
@@ -0,0 +1,279 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.cache.FixedPartitionAttributes;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.cache.PartitionResolver;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.ClassPathLoader;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation;
+import com.gemstone.gemfire.internal.cache.FixedPartitionAttributesImpl;
+import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Stores the information such as partition attributes and meta data details
+ *
+ * @author Suranjan Kumar
+ * @author Yogesh Mahajan
+ *
+ * @since 6.5
+ *
+ */
+public class ClientPartitionAdvisor {
+
+ private final ConcurrentMap<Integer, List<BucketServerLocation66>> bucketServerLocationsMap
+ = new ConcurrentHashMap<Integer, List<BucketServerLocation66>>();
+
+ private final int totalNumBuckets;
+
+ private String serverGroup = "";
+
+ private final String colocatedWith;
+
+ private PartitionResolver partitionResolver = null;
+
+ private Map<String, List<Integer>> fixedPAMap = null;
+
+ private boolean fpaAttrsCompletes = false;
+
+ @SuppressWarnings("unchecked")
+ public ClientPartitionAdvisor(int totalNumBuckets, String colocatedWith,
+ String partitionResolverName, Set<FixedPartitionAttributes> fpaSet) {
+
+ this.totalNumBuckets = totalNumBuckets;
+ this.colocatedWith = colocatedWith;
+ try {
+ if (partitionResolverName != null) {
+ this.partitionResolver = (PartitionResolver)
+ ClassPathLoader.getLatest().forName(partitionResolverName).newInstance();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new InternalGemFireException(LocalizedStrings.ClientPartitionAdvisor_CANNOT_CREATE_AN_INSTANCE_OF_PARTITION_RESOLVER_0.toLocalizedString(partitionResolverName));
+ }
+ if (fpaSet != null) {
+ fixedPAMap = new ConcurrentHashMap<String, List<Integer>>();
+ int totalFPABuckets = 0;
+ for (FixedPartitionAttributes fpa : fpaSet) {
+ List attrList = new ArrayList();
+ totalFPABuckets+=fpa.getNumBuckets();
+ attrList.add(fpa.getNumBuckets());
+ attrList.add(((FixedPartitionAttributesImpl)fpa).getStartingBucketID());
+ fixedPAMap.put(fpa.getPartitionName(), attrList);
+ }
+ if(totalFPABuckets == this.totalNumBuckets) {
+ this.fpaAttrsCompletes = true;
+ }
+ }
+ }
+
+ public ServerLocation adviseServerLocation(int bucketId) {
+ if (this.bucketServerLocationsMap.containsKey(bucketId)) {
+ List<BucketServerLocation66> locations = this.bucketServerLocationsMap
+ .get(bucketId);
+ List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(
+ locations);
+ // TODO: We need to consider Load Balancing Policy
+ if (locationsCopy.isEmpty()) {
+ return null;
+ }
+ if (locationsCopy.size() == 1) {
+ return locationsCopy.get(0);
+ }
+ int index = new Random().nextInt(locationsCopy.size() - 1);
+ return locationsCopy.get(index);
+ }
+ return null;
+ }
+
+ public ServerLocation adviseRandomServerLocation() {
+ ArrayList<Integer> bucketList = new ArrayList<Integer>(
+ this.bucketServerLocationsMap.keySet());
+
+ if (bucketList.size() > 0) {
+ Collections.shuffle(bucketList);
+ List<BucketServerLocation66> locations = this.bucketServerLocationsMap
+ .get(bucketList.get(0));
+
+ if (locations != null) {
+ List<BucketServerLocation66> serverList = new ArrayList<BucketServerLocation66>(
+ locations);
+ if (serverList.size() == 0)
+ return null;
+ return serverList.get(0);
+ }
+ }
+ return null;
+ }
+
+ public List<BucketServerLocation66> adviseServerLocations(int bucketId) {
+ if (this.bucketServerLocationsMap.containsKey(bucketId)) {
+ List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(
+ this.bucketServerLocationsMap.get(bucketId));
+ return locationsCopy;
+ }
+ return null;
+ }
+
+ public ServerLocation advisePrimaryServerLocation(int bucketId) {
+ if (this.bucketServerLocationsMap.containsKey(bucketId)) {
+ List<BucketServerLocation66> locations = this.bucketServerLocationsMap
+ .get(bucketId);
+ List<BucketServerLocation66> locationsCopy = new ArrayList<BucketServerLocation66>(
+ locations);
+ for (BucketServerLocation66 loc : locationsCopy) {
+ if (loc.isPrimary()) {
+ return loc;
+ }
+ }
+ }
+ return null;
+ }
+
+ public void updateBucketServerLocations(int bucketId,
+ List<BucketServerLocation66> bucketServerLocations, ClientMetadataService cms) {
+ List<BucketServerLocation66> locationCopy = new ArrayList<BucketServerLocation66>();
+ List<BucketServerLocation66> locations;
+
+ boolean honourSeverGroup = cms.honourServerGroup();
+
+ if (this.serverGroup.length() != 0 && honourSeverGroup) {
+ for (BucketServerLocation66 s : bucketServerLocations) {
+ String[] groups = s.getServerGroups();
+ if (groups.length > 0) {
+ for (String str : groups) {
+ if (str.equals(this.serverGroup)) {
+ locationCopy.add(s);
+ break;
+ }
+ }
+ } else {
+ locationCopy.add(s);
+ }
+ }
+ locations = Collections.unmodifiableList(locationCopy);
+ } else {
+ locations = Collections.unmodifiableList(bucketServerLocations);
+ }
+
+ this.bucketServerLocationsMap.put(bucketId, locations);
+ }
+
+ public void removeBucketServerLocation(ServerLocation serverLocation) {
+ Iterator<Map.Entry<Integer, List<BucketServerLocation66>>> iter = this.bucketServerLocationsMap
+ .entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Integer, List<BucketServerLocation66>> entry = iter.next();
+ Integer key = entry.getKey();
+ List<BucketServerLocation66> oldLocations = entry.getValue();
+ List<BucketServerLocation66> newLocations = new ArrayList<BucketServerLocation66>(
+ oldLocations);
+ // if this serverLocation contains in the list the remove the
+ // serverLocation and update the map with new List
+ while (newLocations.remove(serverLocation)
+ && !this.bucketServerLocationsMap.replace(key, oldLocations,
+ newLocations)) {
+ oldLocations = this.bucketServerLocationsMap.get(key);
+ newLocations = new ArrayList<BucketServerLocation66>(oldLocations);
+ }
+ }
+ }
+
+ public Map<Integer, List<BucketServerLocation66>> getBucketServerLocationsMap_TEST_ONLY() {
+ return this.bucketServerLocationsMap;
+ }
+
+ /**
+ * This method returns total number of buckets for a PartitionedRegion.
+ *
+ * @return total number of buckets for a PartitionedRegion.
+ */
+
+ public int getTotalNumBuckets() {
+ return this.totalNumBuckets;
+ }
+
+ /**
+ * @return the serverGroup
+ */
+ public String getServerGroup() {
+ return this.serverGroup;
+ }
+
+
+ public void setServerGroup(String group) {
+ this.serverGroup = group;
+ }
+
+ /**
+ * Returns name of the colocated PartitionedRegion on CacheServer
+ */
+ public String getColocatedWith() {
+ return this.colocatedWith;
+ }
+
+ /**
+ * Returns the PartitionResolver set for custom partitioning
+ *
+ * @return <code>PartitionResolver</code> for the PartitionedRegion
+ */
+ public PartitionResolver getPartitionResolver() {
+ return this.partitionResolver;
+ }
+
+ public Set<String> getFixedPartitionNames() {
+ return this.fixedPAMap.keySet();
+ }
+
+ public int assignFixedBucketId(Region region, String partition,
+ Object resolveKey) {
+ if (this.fixedPAMap.containsKey(partition)) {
+ List<Integer> attList = this.fixedPAMap.get(partition);
+ int hc = resolveKey.hashCode();
+ int bucketId = Math.abs(hc % (attList.get(0)));
+ int partitionBucketID = bucketId + attList.get(1);
+ return partitionBucketID;
+ }
+ else {
+ // We don't know as we might not have got the all FPAttributes
+ // from the FPR, So don't throw the exception but send the request
+ // to the server and update the FPA attributes
+ // This exception should be thrown from the server as we will
+ // not be sure of partition not available unless we contact the server.
+ return -1;
+ }
+ }
+
+ public Map<String, List<Integer>> getFixedPAMap(){
+ return this.fixedPAMap;
+ }
+
+ public void updateFixedPAMap(Map<String, List<Integer>> map) {
+ this.fixedPAMap.putAll(map);
+ }
+
+ public boolean isFPAAttrsComplete() {
+ return this.fpaAttrsCompletes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java
new file mode 100644
index 0000000..7526ff7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientRegionFactoryImpl.java
@@ -0,0 +1,262 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CustomExpiry;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.client.ClientRegionFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.compression.Compressor;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.UserSpecifiedRegionAttributes;
+
+/**
+ * The distributed system will always default to a loner on a client.
+ *
+ * @author darrel
+ * @since 6.5
+ */
+
+public class ClientRegionFactoryImpl<K,V> implements ClientRegionFactory<K,V>
+{
+ private final AttributesFactory<K,V> attrsFactory;
+ private final GemFireCacheImpl cache;
+
+ /**
+ * Constructs a ClientRegionFactory by creating a DistributedSystem and a Cache. If
+ * no DistributedSystem exists it creates a loner DistributedSystem,
+ * otherwise it uses the existing DistributedSystem.
+ * A default pool will be used unless ...
+ * The Region
+ * configuration is initialized using the given region shortcut.
+ *
+ * @param pra
+ * the region shortcut to use
+ */
+ public ClientRegionFactoryImpl(GemFireCacheImpl cache, ClientRegionShortcut pra) {
+ this.cache = cache;
+ RegionAttributes ra = cache.getRegionAttributes(pra.toString());
+ if (ra == null) {
+ throw new IllegalStateException("The region shortcut " + pra
+ + " has been removed.");
+ }
+ this.attrsFactory = new AttributesFactory<K,V>(ra);
+ initAttributeFactoryDefaults();
+ }
+
+ /**
+ * Constructs a ClientRegionFactory by creating a DistributedSystem and a Cache. If
+ * no DistributedSystem exists it creates a loner DistributedSystem,
+ * otherwise it uses the existing DistributedSystem.
+ * A default pool will be used unless ...
+ * The region configuration is initialized using a region attributes
+ * whose name was given as the refid.
+ *
+ * @param refid
+ * the name of the region attributes to use
+ */
+ public ClientRegionFactoryImpl(GemFireCacheImpl cache, String refid) {
+ this.cache = cache;
+ RegionAttributes ra = cache.getRegionAttributes(refid);
+ if (ra == null) {
+ throw new IllegalStateException("The named region attributes \"" + refid
+ + "\" has not been defined.");
+ }
+ this.attrsFactory = new AttributesFactory<K,V>(ra);
+ initAttributeFactoryDefaults();
+ }
+
+ private void initAttributeFactoryDefaults() {
+ this.attrsFactory.setScope(Scope.LOCAL);
+ this.attrsFactory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+// this.attrsFactory.setIgnoreJTA(true); in 6.6 and later releases client regions support JTA
+ }
+
+ /**
+ * Returns the cache used by this factory.
+ */
+ private GemFireCacheImpl getCache() {
+ return this.cache;
+ }
+
+ private Pool getDefaultPool() {
+ return getCache().getDefaultPool();
+ }
+
+ public ClientRegionFactory<K,V> addCacheListener(CacheListener<K,V> aListener)
+ {
+ this.attrsFactory.addCacheListener(aListener);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> initCacheListeners(CacheListener<K,V>[] newListeners)
+ {
+ this.attrsFactory.initCacheListeners(newListeners);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setEvictionAttributes(EvictionAttributes evictionAttributes) {
+ this.attrsFactory.setEvictionAttributes(evictionAttributes);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setEntryIdleTimeout(ExpirationAttributes idleTimeout)
+ {
+ this.attrsFactory.setEntryIdleTimeout(idleTimeout);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setCustomEntryIdleTimeout(CustomExpiry<K,V> custom) {
+ this.attrsFactory.setCustomEntryIdleTimeout(custom);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setEntryTimeToLive(ExpirationAttributes timeToLive)
+ {
+ this.attrsFactory.setEntryTimeToLive(timeToLive);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setCustomEntryTimeToLive(CustomExpiry<K,V> custom) {
+ this.attrsFactory.setCustomEntryTimeToLive(custom);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setRegionIdleTimeout(ExpirationAttributes idleTimeout)
+ {
+ this.attrsFactory.setRegionIdleTimeout(idleTimeout);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setRegionTimeToLive(ExpirationAttributes timeToLive)
+ {
+ this.attrsFactory.setRegionTimeToLive(timeToLive);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setKeyConstraint(Class<K> keyConstraint)
+ {
+ this.attrsFactory.setKeyConstraint(keyConstraint);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setValueConstraint(Class<V> valueConstraint)
+ {
+ this.attrsFactory.setValueConstraint(valueConstraint);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setInitialCapacity(int initialCapacity)
+ {
+ this.attrsFactory.setInitialCapacity(initialCapacity);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setLoadFactor(float loadFactor)
+ {
+ this.attrsFactory.setLoadFactor(loadFactor);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setConcurrencyLevel(int concurrencyLevel)
+ {
+ this.attrsFactory.setConcurrencyLevel(concurrencyLevel);
+ return this;
+ }
+
+ public void setConcurrencyChecksEnabled(boolean concurrencyChecksEnabled) {
+ this.attrsFactory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
+ }
+
+ public ClientRegionFactory<K,V> setDiskStoreName(String name) {
+ this.attrsFactory.setDiskStoreName(name);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setDiskSynchronous(boolean isSynchronous)
+ {
+ this.attrsFactory.setDiskSynchronous(isSynchronous);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setStatisticsEnabled(boolean statisticsEnabled)
+ {
+ this.attrsFactory.setStatisticsEnabled(statisticsEnabled);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setCloningEnabled(boolean cloningEnable) {
+ this.attrsFactory.setCloningEnabled(cloningEnable);
+ return this;
+ }
+
+ public ClientRegionFactory<K,V> setPoolName(String poolName) {
+ this.attrsFactory.setPoolName(poolName);
+ return this;
+ }
+
+ @Override
+ public ClientRegionFactory<K, V> setCompressor(Compressor compressor) {
+ this.attrsFactory.setCompressor(compressor);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Region<K,V> create(String name) throws RegionExistsException {
+ return getCache().basicCreateRegion(name, createRegionAttributes());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Region<K,V> createSubregion(Region<?,?> parent, String name) throws RegionExistsException {
+ return ((LocalRegion)parent).createSubregion(name, createRegionAttributes());
+ }
+
+ @SuppressWarnings("deprecation")
+ private RegionAttributes<K,V> createRegionAttributes() {
+ RegionAttributes<K,V> ra = this.attrsFactory.create();
+ if (ra.getPoolName() == null || "".equals(ra.getPoolName())) {
+ UserSpecifiedRegionAttributes<K, V> ura = (UserSpecifiedRegionAttributes<K, V>)ra;
+ if (ura.requiresPoolName) {
+ Pool dp = getDefaultPool();
+ if (dp != null) {
+ this.attrsFactory.setPoolName(dp.getName());
+ ra = this.attrsFactory.create();
+ } else {
+ throw new IllegalStateException("The poolName must be set on a client.");
+ }
+ }
+ }
+ return ra;
+ }
+
+ // public ClientRegionFactory<K, V> addParallelGatewaySenderId(
+// String parallelGatewaySenderId) {
+// this.attrsFactory.addParallelGatewaySenderId(parallelGatewaySenderId);
+// return this;
+// }
+//
+// public ClientRegionFactory<K, V> addSerialGatewaySenderId(
+// String serialGatewaySenderId) {
+// this.attrsFactory.addSerialGatewaySenderId(serialGatewaySenderId);
+// return this;
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java
new file mode 100644
index 0000000..4333768
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientUpdater.java
@@ -0,0 +1,27 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+/**
+ * @author dsmith
+ *
+ */
+public interface ClientUpdater {
+
+ void close();
+
+ boolean isAlive();
+
+ void join(long wait) throws InterruptedException;
+
+ public void setFailedUpdater(ClientUpdater failedUpdater);
+
+ public boolean isProcessing();
+
+ public boolean isPrimary();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
new file mode 100644
index 0000000..269a354
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CloseConnectionOp.java
@@ -0,0 +1,86 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tell a server that a connection is being closed
+ * @author darrel
+ * @since 5.7
+ */
+public class CloseConnectionOp {
+ /**
+ * Tell a server that a connection is being closed
+ * @param con the connection that is being closed
+ * @param keepAlive whether to keep the proxy alive on the server
+ */
+ public static void execute(Connection con, boolean keepAlive)
+ throws Exception
+ {
+ AbstractOp op = new CloseConnectionOpImpl(keepAlive);
+ con.execute(op);
+ }
+
+ private CloseConnectionOp() {
+ // no instances allowed
+ }
+
+ private static class CloseConnectionOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public CloseConnectionOpImpl(boolean keepAlive) {
+ super(MessageType.CLOSE_CONNECTION, 1);
+ getMessage().addRawPart(new byte[]{(byte)(keepAlive?1:0)}, false);
+ }
+ @Override
+ protected Message createResponseMessage() {
+ // no response is sent
+ return null;
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ throw new IllegalStateException("should never be called");
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startCloseCon();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endCloseConSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endCloseCon(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
new file mode 100644
index 0000000..0b2cc6c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/CommitOp.java
@@ -0,0 +1,100 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Does a commit on a server
+ * @author gregp
+ * @since 6.6
+ */
+public class CommitOp {
+ /**
+ * Does a commit on a server using connections from the given pool
+ * to communicate with the server.
+ * @param pool the pool to use to communicate with the server.
+ */
+ public static TXCommitMessage execute(ExecutablePool pool,int txId)
+ {
+ CommitOpImpl op = new CommitOpImpl(txId);
+ pool.execute(op);
+ return op.getTXCommitMessageResponse();
+ }
+
+ private CommitOp() {
+ // no instances allowed
+ }
+
+
+ private static class CommitOpImpl extends AbstractOp {
+ private int txId;
+
+ private TXCommitMessage tXCommitMessageResponse = null;
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public CommitOpImpl(int txId) {
+ super(MessageType.COMMIT, 1);
+ getMessage().setTransactionId(txId);
+ this.txId = txId;
+ }
+
+ public TXCommitMessage getTXCommitMessageResponse() {
+ return tXCommitMessageResponse;
+ }
+
+ @Override
+ public String toString() {
+ return "TXCommit(txId="+this.txId+")";
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ TXCommitMessage rcs = (TXCommitMessage)processObjResponse(msg, "commit");
+ assert rcs != null : "TxCommit response was null";
+ this.tXCommitMessageResponse = rcs;
+ return rcs;
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.EXCEPTION;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startCommit();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endCommitSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endCommit(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java
new file mode 100644
index 0000000..99bf77f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/Connection.java
@@ -0,0 +1,75 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+
+/**
+ * Represents a connection from a client to a server.
+ * Instances are created, kept, and used by {@link PoolImpl}.
+ * @author darrel
+ * @since 5.7
+ */
+public interface Connection {
+ public static final long DEFAULT_CONNECTION_ID = 26739;
+
+ public Socket getSocket();
+ public ByteBuffer getCommBuffer();
+ public ConnectionStats getStats();
+ /**
+ * Forcefully close the resources used by this connection.
+ * This should be called if the connection or the server dies.
+ */
+ public void destroy();
+
+ /**
+ * Return true if this connection has been destroyed
+ */
+ public boolean isDestroyed();
+
+ /**
+ * Gracefully close the connection by notifying
+ * the server. It is not necessary to call destroy
+ * after closing the connection.
+ * @param keepAlive What do do this server to
+ * client connection proxy on this server.
+ * @throws Exception if there was an error notifying the server.
+ * The connection will still be destroyed.
+ */
+ public void close(boolean keepAlive) throws Exception;
+
+ public ServerLocation getServer();
+
+ public Endpoint getEndpoint();
+
+ public ServerQueueStatus getQueueStatus();
+
+ public Object execute(Op op) throws Exception;
+
+ public void emergencyClose();
+
+ public short getWanSiteVersion();
+
+ public void setWanSiteVersion(short wanSiteVersion);
+
+ public int getDistributedSystemId();
+
+ public OutputStream getOutputStream();
+
+ public InputStream getInputStream();
+
+ public void setConnectionID(long id);
+
+ public long getConnectionID();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java
new file mode 100644
index 0000000..390db3a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactory.java
@@ -0,0 +1,59 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Set;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * A factory for creating new connections.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public interface ConnectionFactory {
+
+ /**
+ * Create a client to server connection to the given server
+ * @param location the server to connection
+ * @return a connection to that server, or null if
+ * a connection could not be established.
+ * @throws GemFireSecurityException if there was a security exception
+ * while trying to establish a connections.
+ */
+ Connection createClientToServerConnection(ServerLocation location, boolean forQueue)
+ throws GemFireSecurityException;
+
+ /**
+ * Returns the best server for this client to connect to.
+ * Returns null if no servers exist.
+ * @param currentServer if non-null then we are trying to replace a connection
+ * that we have to this server.
+ * @param excludedServers the list of servers
+ * to skip over when finding a server to connect to
+ */
+ ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers);
+
+ /**
+ * Create a client to server connection to any server
+ * that is not in the excluded list.
+ * @param excludedServers the list of servers
+ * to skip over when finding a server to connect to
+ * @return a connection or null if
+ * a connection could not be established.
+ * @throws GemFireSecurityException if there was a security exception
+ * trying to establish a connection.
+ */
+ Connection createClientToServerConnection(Set excludedServers) throws GemFireSecurityException;
+
+ ClientUpdater createServerToClientConnection(Endpoint endpoint, QueueManager qManager, boolean isPrimary, ClientUpdater failedUpdater);
+
+ ServerBlackList getBlackList();
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java
new file mode 100644
index 0000000..ae4b851
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ConnectionFactoryImpl.java
@@ -0,0 +1,305 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.GatewayConfigurationException;
+import com.gemstone.gemfire.cache.client.ServerRefusedConnectionException;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.FailureTracker;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.PoolCancelledException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientUpdater;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+
+/**
+ * Creates connections, using a connection source to determine
+ * which server to connect to.
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class ConnectionFactoryImpl implements ConnectionFactory {
+
+ private static final Logger logger = LogService.getLogger();
+
+ //TODO - the handshake holds state. It seems like the code depends
+ //on all of the handshake operations happening in a single thread. I don't think we
+ //want that, need to refactor.
+ private final HandShake handshake;
+ private final int socketBufferSize;
+ private final int handShakeTimeout;
+ private final boolean usedByGateway;
+ private final ServerBlackList blackList;
+ private final CancelCriterion cancelCriterion;
+ private ConnectionSource source;
+ private int readTimeout;
+ private DistributedSystem ds;
+ private EndpointManager endpointManager;
+ private GatewaySender gatewaySender;
+ private PoolImpl pool;
+
+ /**
+ * Test hook for client version support
+ * @since 5.7
+ */
+
+ public static boolean testFailedConnectionToServer = false;
+
+ public ConnectionFactoryImpl(ConnectionSource source,
+ EndpointManager endpointManager, DistributedSystem sys,
+ int socketBufferSize, int handShakeTimeout, int readTimeout,
+ ClientProxyMembershipID proxyId, CancelCriterion cancelCriterion,
+ boolean usedByGateway, GatewaySender sender,long pingInterval,
+ boolean multiuserSecureMode, PoolImpl pool) {
+ this.handshake = new HandShake(proxyId, sys);
+ this.handshake.setClientReadTimeout(readTimeout);
+ this.source = source;
+ this.endpointManager = endpointManager;
+ this.ds = sys;
+ this.socketBufferSize = socketBufferSize;
+ this.handShakeTimeout = handShakeTimeout;
+ this.handshake.setMultiuserSecureMode(multiuserSecureMode);
+ this.readTimeout = readTimeout;
+ this.usedByGateway = usedByGateway;
+ this.gatewaySender = sender;
+ this.blackList = new ServerBlackList(pingInterval);
+ this.cancelCriterion = cancelCriterion;
+ this.pool = pool;
+ }
+
+ public void start(ScheduledExecutorService background) {
+ blackList.start(background);
+ }
+
+ private byte getCommMode(boolean forQueue) {
+ if (this.usedByGateway || (this.gatewaySender != null)) {
+ return Acceptor.GATEWAY_TO_GATEWAY;
+ } else if(forQueue) {
+ return Acceptor.CLIENT_TO_SERVER_FOR_QUEUE;
+ } else {
+ return Acceptor.CLIENT_TO_SERVER;
+ }
+ }
+
+ public ServerBlackList getBlackList() {
+ return blackList;
+ }
+
+ public Connection createClientToServerConnection(ServerLocation location, boolean forQueue) throws GemFireSecurityException {
+ ConnectionImpl connection = new ConnectionImpl(this.ds, this.cancelCriterion);
+ FailureTracker failureTracker = blackList.getFailureTracker(location);
+
+ boolean initialized = false;
+
+ try {
+ HandShake connHandShake = new HandShake(handshake);
+ connection.connect(endpointManager, location, connHandShake,
+ socketBufferSize, handShakeTimeout, readTimeout, getCommMode(forQueue), this.gatewaySender);
+ failureTracker.reset();
+ connection.setHandShake(connHandShake);
+ authenticateIfRequired(connection);
+ initialized = true;
+ } catch(CancelException e) {
+ //propagate this up, don't retry
+ throw e;
+ } catch(GemFireSecurityException e) {
+ //propagate this up, don't retry
+ throw e;
+ } catch(GatewayConfigurationException e) {
+ //propagate this up, don't retry
+ throw e;
+ } catch(ServerRefusedConnectionException src) {
+ //propagate this up, don't retry
+ logger.warn(LocalizedMessage.create(LocalizedStrings.AutoConnectionSourceImpl_COULD_NOT_CREATE_A_NEW_CONNECTION_TO_SERVER_0, src.getMessage()));
+ testFailedConnectionToServer = true;
+ throw src;
+ } catch (Exception e) {
+ if (e.getMessage() != null &&
+ (e.getMessage().equals("Connection refused")
+ || e.getMessage().equals("Connection reset"))) { // this is the most common case, so don't print an exception
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unable to connect to {}: connection refused", location);
+ }
+ } else {//print a warning with the exception stack trace
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectException_COULD_NOT_CONNECT_TO_0, location), e);
+ }
+ testFailedConnectionToServer = true;
+ } finally {
+ if(!initialized) {
+ connection.destroy();
+ failureTracker.addFailure();
+ connection = null;
+ }
+ }
+
+ return connection;
+ }
+
+ private void authenticateIfRequired(Connection conn) {
+ cancelCriterion.checkCancelInProgress(null);
+ if (!pool.isUsedByGateway() && !pool.getMultiuserAuthentication()) {
+ if (conn.getServer().getRequiresCredentials()) {
+ if (conn.getServer().getUserId() == -1) {
+ Long uniqueID = (Long)AuthenticateUserOp.executeOn(conn, pool);
+ conn.getServer().setUserId(uniqueID);
+ if (logger.isDebugEnabled()) {
+ logger.debug("CFI.authenticateIfRequired() Completed authentication on {}", conn);
+ }
+ }
+ }
+ }
+ }
+
+ public ServerLocation findBestServer(ServerLocation currentServer, Set excludedServers) {
+ if (currentServer != null && source.isBalanced()) {
+ return currentServer;
+ }
+ final Set origExcludedServers = excludedServers;
+ excludedServers = new HashSet(excludedServers);
+ Set blackListedServers = blackList.getBadServers();
+ excludedServers.addAll(blackListedServers);
+ ServerLocation server = source.findReplacementServer(currentServer, excludedServers);
+ if (server == null) {
+ // Nothing worked! Let's try without the blacklist.
+ if (excludedServers.size() > origExcludedServers.size()) {
+ // We had some guys black listed so lets give this another whirl.
+ server = source.findReplacementServer(currentServer, origExcludedServers);
+ }
+ }
+ if (server == null && logger.isDebugEnabled()) {
+ logger.debug("Source was unable to findForReplacement any servers");
+ }
+ return server;
+ }
+
+ public Connection createClientToServerConnection(Set excludedServers) throws GemFireSecurityException {
+ final Set origExcludedServers = excludedServers;
+ excludedServers = new HashSet(excludedServers);
+ Set blackListedServers = blackList.getBadServers();
+ excludedServers.addAll(blackListedServers);
+ Connection conn = null;
+// long startTime = System.currentTimeMillis();
+ RuntimeException fatalException = null;
+ boolean tryBlackList = true;
+
+ do {
+ ServerLocation server = source.findServer(excludedServers);
+ if(server == null) {
+
+ if(tryBlackList) {
+ // Nothing worked! Let's try without the blacklist.
+ tryBlackList = false;
+ int size = excludedServers.size();
+ excludedServers.removeAll(blackListedServers);
+ // make sure we didn't remove any of the ones that the caller set not to use
+ excludedServers.addAll(origExcludedServers);
+ if(excludedServers.size()<size) {
+ // We are able to remove some exclusions, so lets give this another whirl.
+ continue;
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Source was unable to locate any servers");
+ }
+ if(fatalException!=null) {
+ throw fatalException;
+ }
+ return null;
+ }
+
+ try {
+ conn = createClientToServerConnection(server, false);
+ } catch(CancelException e) {
+ //propagate this up immediately
+ throw e;
+ } catch(GemFireSecurityException e) {
+ //propagate this up immediately
+ throw e;
+ } catch(GatewayConfigurationException e) {
+ //propagate this up immediately
+ throw e;
+ } catch(ServerRefusedConnectionException srce) {
+ fatalException = srce;
+ if (logger.isDebugEnabled()) {
+ logger.debug("ServerRefusedConnectionException attempting to connect to {}", server , srce);
+ }
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.ConnectException_COULD_NOT_CONNECT_TO_0, server), e);
+ }
+
+ excludedServers.add(server);
+ } while(conn == null);
+
+// if(conn == null) {
+// logger.fine("Unable to create a connection in the allowed time.");
+//
+// if(fatalException!=null) {
+// throw fatalException;
+// }
+// }
+ return conn;
+ }
+
+ public ClientUpdater createServerToClientConnection(Endpoint endpoint,
+ QueueManager qManager, boolean isPrimary, ClientUpdater failedUpdater) {
+ String clientUpdateName = CacheClientUpdater.CLIENT_UPDATER_THREAD_NAME
+ + " on " + endpoint.getMemberId() + " port " + endpoint.getLocation().getPort();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Establishing: {}", clientUpdateName);
+ }
+// Launch the thread
+ CacheClientUpdater updater = new CacheClientUpdater(clientUpdateName,
+ endpoint.getLocation(), isPrimary, ds, new HandShake(this.handshake), qManager,
+ endpointManager, endpoint, handShakeTimeout);
+
+ if(!updater.isConnected()) {
+ return null;
+ }
+
+ updater.setFailedUpdater(failedUpdater);
+ updater.start();
+
+// Wait for the client update thread to be ready
+// if (!updater.waitForInitialization()) {
+ // Yogesh : This doesn't wait for notify if the updater
+ // thread exits from the run in case of Exception in CCU thread
+ // Yogesh : fix for 36690
+ // because when CCU thread gets a ConnectException, it comes out of run method
+ // and when a thread is no more running it notifies all the waiting threads on the thread object.
+ // so above wait will come out irrelevant of notify from CCU thread, when CCU thread has got an exception
+ // To avoid this problem we check isAlive before returning from this method.
+// if (logger != null && logger.infoEnabled()) {
+// logger.info(LocalizedStrings.AutoConnectionSourceImpl_0_NOT_STARTED_1, new Object[] {this, clientUpdateName});
+// }
+// return null;
+// }else {
+// if (logger != null && logger.infoEnabled()) {
+// logger.info(LocalizedStrings.AutoConnectionSourceImpl_0_STARTED_1, new Object[] {this, clientUpdateName});
+// }
+// }
+ return updater;
+ }
+}
+
+