You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:09 UTC

[081/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 0000000,b145a91..541c453
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@@ -1,0 -1,11398 +1,11398 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Hashtable;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Random;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.FutureTask;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.locks.Lock;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.InternalGemFireException;
+ import com.gemstone.gemfire.StatisticsFactory;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.cache.AttributesFactory;
+ import com.gemstone.gemfire.cache.AttributesMutator;
+ import com.gemstone.gemfire.cache.Cache;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.CacheException;
+ import com.gemstone.gemfire.cache.CacheListener;
+ import com.gemstone.gemfire.cache.CacheLoader;
+ import com.gemstone.gemfire.cache.CacheLoaderException;
+ import com.gemstone.gemfire.cache.CacheStatistics;
+ import com.gemstone.gemfire.cache.CacheWriter;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.CustomExpiry;
+ import com.gemstone.gemfire.cache.DataPolicy;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.cache.EntryExistsException;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.ExpirationAttributes;
+ import com.gemstone.gemfire.cache.InterestPolicy;
+ import com.gemstone.gemfire.cache.InterestRegistrationEvent;
+ import com.gemstone.gemfire.cache.LoaderHelper;
+ import com.gemstone.gemfire.cache.LowMemoryException;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.PartitionAttributes;
+ import com.gemstone.gemfire.cache.PartitionResolver;
+ import com.gemstone.gemfire.cache.PartitionedRegionDistributionException;
+ import com.gemstone.gemfire.cache.PartitionedRegionStorageException;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionAttributes;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.RegionEvent;
+ import com.gemstone.gemfire.cache.RegionExistsException;
+ import com.gemstone.gemfire.cache.RegionMembershipListener;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
+ import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
+ import com.gemstone.gemfire.cache.TransactionException;
+ import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+ import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+ import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException;
+ import com.gemstone.gemfire.cache.execute.Function;
+ import com.gemstone.gemfire.cache.execute.FunctionContext;
+ import com.gemstone.gemfire.cache.execute.FunctionException;
+ import com.gemstone.gemfire.cache.execute.FunctionService;
+ import com.gemstone.gemfire.cache.execute.ResultCollector;
+ import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+ import com.gemstone.gemfire.cache.partition.PartitionListener;
+ import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
+ import com.gemstone.gemfire.cache.query.FunctionDomainException;
+ import com.gemstone.gemfire.cache.query.Index;
+ import com.gemstone.gemfire.cache.query.IndexCreationException;
+ import com.gemstone.gemfire.cache.query.IndexExistsException;
+ import com.gemstone.gemfire.cache.query.IndexInvalidException;
+ import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+ import com.gemstone.gemfire.cache.query.IndexType;
+ import com.gemstone.gemfire.cache.query.MultiIndexCreationException;
+ import com.gemstone.gemfire.cache.query.NameResolutionException;
+ import com.gemstone.gemfire.cache.query.QueryException;
+ import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+ import com.gemstone.gemfire.cache.query.SelectResults;
+ import com.gemstone.gemfire.cache.query.TypeMismatchException;
+ import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
+ import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+ import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
+ import com.gemstone.gemfire.cache.query.internal.QCompiler;
+ import com.gemstone.gemfire.cache.query.internal.QueryExecutor;
+ import com.gemstone.gemfire.cache.query.internal.ResultsBag;
+ import com.gemstone.gemfire.cache.query.internal.ResultsCollectionWrapper;
+ import com.gemstone.gemfire.cache.query.internal.ResultsSet;
+ import com.gemstone.gemfire.cache.query.internal.index.AbstractIndex;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
+ import com.gemstone.gemfire.cache.query.internal.index.PartitionedIndex;
+ import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+ import com.gemstone.gemfire.cache.query.types.ObjectType;
+ import com.gemstone.gemfire.cache.wan.GatewaySender;
+ import com.gemstone.gemfire.distributed.DistributedLockService;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
+ import com.gemstone.gemfire.distributed.internal.DM;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+ import com.gemstone.gemfire.distributed.internal.DistributionManager;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.DisconnectListener;
+ import com.gemstone.gemfire.distributed.internal.MembershipListener;
+ import com.gemstone.gemfire.distributed.internal.ProfileListener;
+ import com.gemstone.gemfire.distributed.internal.ReplyException;
+ import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+ import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
+ import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.NanoTimer;
+ import com.gemstone.gemfire.internal.SetUtils;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.BucketAdvisor.ServerBucketProfile;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
+ import com.gemstone.gemfire.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse;
+ import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+ import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+ import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+ import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
+ import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+ import com.gemstone.gemfire.internal.cache.execute.FunctionExecutionNodePruner;
+ import com.gemstone.gemfire.internal.cache.execute.FunctionRemoteContext;
+ import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+ import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+ import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionExecutor;
+ import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultWaiter;
+ import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
+ import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+ import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
+ import com.gemstone.gemfire.internal.cache.lru.LRUStatistics;
+ import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage.DestroyResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.DestroyRegionOnDataStoreMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpAllPRConfigMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpBucketsMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchBulkEntriesMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchBulkEntriesMessage.FetchBulkEntriesResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntryMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntryMessage.FetchEntryResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchKeysMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchKeysMessage.FetchKeysResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.GetMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.GetMessage.GetResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityRequestMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityRequestMessage.IdentityResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityUpdateMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityUpdateMessage.IdentityUpdateResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.IndexCreationMsg;
+ import com.gemstone.gemfire.internal.cache.partitioned.InterestEventMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.InterestEventMessage.InterestEventResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage.InvalidateResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRSanityCheckMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage.UpdateEntryVersionResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage.PartitionResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserver;
+ import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserverHolder;
+ import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult;
+ import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
+ import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor;
+ import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
+ import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.SizeMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.SizeMessage.SizeResponse;
+ import com.gemstone.gemfire.internal.cache.persistence.PRPersistentConfig;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+ import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
+ import com.gemstone.gemfire.internal.util.TransformUtils;
+ import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
+ import com.gemstone.gemfire.i18n.StringId;
+ 
+ /**
+  * A Region whose total storage is split into chunks of data (partitions) which
+  * are copied up to a configurable level (for high availability) and placed on
+  * multiple VMs for improved performance and increased storage capacity.
+  * 
+  */
+ public class PartitionedRegion extends LocalRegion implements 
+   CacheDistributionAdvisee, QueryExecutor {
+ 
+   public static final Random rand = new Random(Long.getLong(
+       "gemfire.PartitionedRegionRandomSeed", NanoTimer.getTime()).longValue());
+   
+   private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger();
+   
+   private final DiskRegionStats diskRegionStats;
+   /**
+    * Changes scope of replication to secondary bucket to SCOPE.DISTRIBUTED_NO_ACK
+    */
+   public static final boolean DISABLE_SECONDARY_BUCKET_ACK = Boolean.getBoolean(
+       "gemfire.disablePartitionedRegionBucketAck");
+   
+   /**
+    * A debug flag used for testing calculation of starting bucket id
+    */
+   public static boolean BEFORE_CALCULATE_STARTING_BUCKET_FLAG = false;
+   
+   /**
+    * Thread specific random number
+    */
+   private static ThreadLocal threadRandom = new ThreadLocal() {
+     @Override
+     protected Object initialValue() {
+       int i = rand.nextInt();
+       if (i < 0) {
+         i = -1 * i;
+       }
+       return Integer.valueOf(i);
+     }
+   };
+ 
+   /**
+    * Global Region for storing PR config ( PRName->PRConfig). This region would
+    * be used to resolve PR name conflict.*
+    */
+   private volatile Region<String, PartitionRegionConfig> prRoot;
+ 
+   /**
+    * 
+    * PartitionedRegionDataStore class takes care of data storage for the PR.
+    * This will contain the bucket Regions to store data entries for PR*
+    */
+   protected PartitionedRegionDataStore dataStore;
+ 
+   /**
+    * The advisor that hold information about this partitioned region
+    */
+   private final RegionAdvisor distAdvisor;
+ 
+   /** Logging mechanism for debugging */
+   private static final Logger logger = LogService.getLogger();
+ 
+   /** cleanup flags * */
+   private boolean cleanPRRegistration = false;
+ 
+   /** Time to wait for for acquiring distributed lock ownership */
+   final static long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter
+       .parseLong(
+           System
+               .getProperty(PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_PROPERTY),
+           PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_DEFAULT);
+ 
+   /**
+    * default redundancy level is 0.
+    */
+   final int redundantCopies;
+ 
+   /**
+    * The miminum amount of redundancy needed for a write operation
+    */
+   final int minimumWriteRedundancy;
+ 
+   /**
+    * The miminum amount of redundancy needed for a read operation
+    */
+   final int minimumReadRedundancy;
+ 
+   /**
+    * Ratio of currently allocated memory to maxMemory that triggers rebalance
+    * activity.
+    */
+   final static float rebalanceThreshold = 0.75f;
+ 
+   /** The maximum memory allocated for this node in Mb */
+   final int localMaxMemory;
+ 
+   /** The maximum milliseconds for retrying operations */
+   final private int retryTimeout;
+ 
+   /**
+    * The statistics for this PR
+    */
+   public final PartitionedRegionStats prStats;
+ 
+   // private Random random = new Random(System.currentTimeMillis());
+ 
+   /** Number of initial buckets */
+   private final int totalNumberOfBuckets;
+ 
+   /**
+    * To check if local cache is enabled.
+    */
+   private static final boolean localCacheEnabled = false;
+ 
+   // private static final boolean throwIfNoNodesLeft = true;
+ 
+   public static final int DEFAULT_RETRY_ITERATIONS = 3;
+ 
+   /**
+    * Flag to indicate if a cache loader is present
+    */
+   private volatile boolean haveCacheLoader;
+ 
+   /**
+    * Region identifier used for DLocks (Bucket and Region)
+    */
+   private final String regionIdentifier;
+ 
+   /**
+    * Maps each PR to a prId. This prId will uniquely identify the PR.
+    */
+   static final PRIdMap prIdToPR = new PRIdMap();
+ 
+   /**
+    * Flag to indicate whether region is closed
+    * 
+    */
+   public volatile boolean isClosed = false;
+ 
+   /**
+    * a flag indicating that the PR is destroyed in this VM
+    */
+   public volatile boolean isLocallyDestroyed = false;
+   
+   /**
+    * the thread locally destroying this pr.  not volatile,
+    * so always check isLocallyDestroyed before checking locallyDestroyingThread
+    * 
+    * Concurrency: {@link #isLocallyDestroyed} is volatile
+    */
+   public Thread locallyDestroyingThread;
+ 
+   // TODO someone please add a javadoc for this
+   private volatile boolean hasPartitionedIndex = false;
+ 
+   /**
+    * regionMembershipListener notification requires this to be plugged into
+    * a PR's RegionAdvisor
+    */
+   private final AdvisorListener advisorListener = new AdvisorListener();
+ 
+   /*
+    * Map containing <IndexTask, FutureTask<IndexTask> or Index>.
+    * IndexTask represents an index thats completely created or
+    * one thats in create phase. This is done in order to avoid
+    * synchronization on the indexes.
+    */
+   private final ConcurrentMap indexes = new ConcurrentHashMap();
+ 
+   private volatile boolean recoveredFromDisk;
+ 
+   public static final int RUNNING_MODE = -1;
+   public static final int PRIMARY_BUCKETS_LOCKED = 1;
+   public static final int DISK_STORE_FLUSHED = 2;
+   public static final int OFFLINE_EQUAL_PERSISTED = 3;
+ 
+   private volatile int shutDownAllStatus = RUNNING_MODE;
+   
+   private final long birthTime = System.currentTimeMillis();
+ 
+   public void setShutDownAllStatus(int newStatus) {
+     this.shutDownAllStatus = newStatus;
+   }
+   
+   private final PartitionedRegion colocatedWithRegion;
+ 
+   private List<BucketRegion> sortedBuckets; 
+   
+   private ScheduledExecutorService bucketSorter;
+ 
+   private ConcurrentMap<String, Integer[]> partitionsMap = new ConcurrentHashMap<String, Integer[]>();
+ 
+   public ConcurrentMap<String, Integer[]> getPartitionsMap() {
+     return this.partitionsMap;
+   }
+   /**
+   * for wan shadowPR
+   */
+   private boolean enableConflation;
+  
+   private final Object indexLock = new Object();
+  
+   /**
+    * Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 =
+    * NWHOP tp servers in other server-grp
+    */
+   private final ThreadLocal<Byte> isNetworkHop = new ThreadLocal<Byte>() {
+     @Override
+     protected Byte initialValue() {
+       return Byte.valueOf((byte)0);
+     }
+   };
+ 
+   public void setIsNetworkHop(Byte value) {
+     this.isNetworkHop.set(value);
+   }
+ 
+   public Byte isNetworkHop() {
+     return this.isNetworkHop.get();
+   }
+   
+   private final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() {
+     @Override
+     protected Byte initialValue() {
+       return 0;
+     }
+   };
+ 
+   public void setMetadataVersion(Byte value) {
+     this.metadataVersion.set(value);
+   }
+ 
+   public Byte getMetadataVersion() {
+     return this.metadataVersion.get();
+   }
+       
+ 
+   /**
+    * Returns the LRUStatistics for this PR.
+    * This is needed to find the single instance of LRUStatistics
+    * created early for a PR when it is recovered from disk.
+    * This fixes bug 41938
+    */
+   public LRUStatistics getPRLRUStatsDuringInitialization() {
+     LRUStatistics result = null;
+     if (getDiskStore() != null) {
+       result = getDiskStore().getPRLRUStats(this);
+     }
+     return result;
+   }
+                
+   
+   //////////////////  ConcurrentMap methods //////////////////               
+           
+   @Override
+    public boolean remove(Object key, Object value, Object callbackArg) {
+      final long startTime = PartitionedRegionStats.startTime();
+      try {
+        return super.remove(key, value, callbackArg);
+      }
+      finally {
+        this.prStats.endDestroy(startTime);
+      }
+    }
+    
+    
+                
+    //////////////////  End of ConcurrentMap methods ////////////////// 
+                
+ 
+   public PartitionListener[] getPartitionListeners() {
+     return this.partitionListeners;
+   }
+   
+   /**
+    * Return canonical representation for a bucket (for logging)
+    * 
+    * @param bucketId
+    *                the bucket
+    * @return a String representing this PR and the bucket
+    */
+   public String bucketStringForLogs(int bucketId) {
+     return getPRId() + BUCKET_ID_SEPARATOR + bucketId;
+   }
+ 
+   /** Separator between PRId and bucketId for creating bucketString */
+   public static final String BUCKET_ID_SEPARATOR = ":";
+ 
+   /**
+    * Clear the prIdMap, typically used when disconnecting from the distributed
+    * system or clearing the cache
+    */
+   public static void clearPRIdMap() {
+     synchronized (prIdToPR) {
+       prIdToPR.clear();
+     }
+   }
+ 
+   private static DisconnectListener dsPRIdCleanUpListener = new DisconnectListener() {
+     @Override
+     public String toString() {
+       return LocalizedStrings.PartitionedRegion_SHUTDOWN_LISTENER_FOR_PARTITIONEDREGION.toLocalizedString();
+     }
+ 
+     public void onDisconnect(InternalDistributedSystem sys) {
+       clearPRIdMap();
+     }
+   };
+ 
+ 
+   public static class PRIdMap extends HashMap {
+     private static final long serialVersionUID = 3667357372967498179L;
+     public final static String DESTROYED = "Partitioned Region Destroyed";
+ 
+     final static String LOCALLY_DESTROYED = "Partitioned Region Is Locally Destroyed";
+ 
+     final static String FAILED_REGISTRATION = "Partitioned Region's Registration Failed";
+ 
+     public final static String NO_PATH_FOUND = "NoPathFound";
+ 
+     private volatile boolean cleared = true;
+ 
+     @Override
+     public Object get(Object key) {
+       throw new UnsupportedOperationException(LocalizedStrings.PartitionedRegion_PRIDMAPGET_NOT_SUPPORTED_USE_GETREGION_INSTEAD.toLocalizedString());
+     }
+ 
+     public Object getRegion(Object key) throws PRLocallyDestroyedException {
+       if (cleared) {
+         Cache c = GemFireCacheImpl.getInstance();
+         if (c == null) {
+           throw new CacheClosedException();
+         }
+         else {
+           c.getCancelCriterion().checkCancelInProgress(null);
+         }
+       }
+       Assert.assertTrue(key instanceof Integer);
+ 
+       Object o = super.get(key);
+       if (o == DESTROYED) {
+         throw new RegionDestroyedException(LocalizedStrings.PartitionedRegion_REGION_FOR_PRID_0_IS_DESTROYED.toLocalizedString(key), NO_PATH_FOUND);
+       }
+       if (o == LOCALLY_DESTROYED) {
+         throw new PRLocallyDestroyedException(LocalizedStrings.PartitionedRegion_REGION_WITH_PRID_0_IS_LOCALLY_DESTROYED_ON_THIS_NODE.toLocalizedString(key));
+       }
+       if (o == FAILED_REGISTRATION) {
+         throw new PRLocallyDestroyedException(LocalizedStrings.PartitionedRegion_REGION_WITH_PRID_0_FAILED_INITIALIZATION_ON_THIS_NODE.toLocalizedString(key));
+       }
+       return o;
+     }
+ 
+     @Override
+     public Object remove(final Object key) {
+       return put(key, DESTROYED, true);
+     }
+ 
+     @Override
+     public Object put(final Object key, final Object value) {
+       return put(key, value, true);
+     }
+ 
+     public Object put(final Object key, final Object value,
+         boolean sendIdentityRequestMessage) {
+       if (cleared) {
+         cleared = false;
+       }
+ 
+       if (key == null) {
+         throw new NullPointerException(LocalizedStrings.PartitionedRegion_NULL_KEY_NOT_ALLOWED_FOR_PRIDTOPR_MAP.toLocalizedString());
+       }
+       if (value == null) {
+         throw new NullPointerException(LocalizedStrings.PartitionedRegion_NULL_VALUE_NOT_ALLOWED_FOR_PRIDTOPR_MAP.toLocalizedString());
+       }
+       Assert.assertTrue(key instanceof Integer);
+       if (sendIdentityRequestMessage)
+         IdentityRequestMessage.setLatestId(((Integer)key).intValue());
+       if ((super.get(key) == DESTROYED) && (value instanceof PartitionedRegion)) {
+         PartitionedRegionException pre = new PartitionedRegionException(LocalizedStrings.PartitionedRegion_CAN_NOT_REUSE_OLD_PARTITIONED_REGION_ID_0.toLocalizedString(key));
+         throw pre;
+       }
+       return super.put(key, value);
+     }
+ 
+     @Override
+     public void clear() {
+       this.cleared = true;
+       super.clear();
+     }
+ 
+     public synchronized String dump() {
+       StringBuffer b = new StringBuffer("prIdToPR Map@");
+       b.append(System.identityHashCode(prIdToPR)).append(":\n");
+       Map.Entry me;
+       for (Iterator i = prIdToPR.entrySet().iterator(); i.hasNext();) {
+         me = (Map.Entry)i.next();
+         b.append(me.getKey()).append("=>").append(me.getValue());
+         if (i.hasNext()) {
+           b.append("\n");
+         }
+       }
+       return b.toString();
+     }
+   }
+ 
+   private int partitionedRegionId = -3;
+ 
+   // final private Scope userScope;
+ 
+   /** Node description */
+   final private Node node;
+ 
+   /** Helper Object for redundancy Management of PartitionedRegion */
+   private final PRHARedundancyProvider redundancyProvider;
+ 
+   /**
+    * flag saying whether this VM needs cache operation notifications from other
+    * members
+    */
+   private boolean requiresNotification;
+ 
+   /**
+    * Latch that signals when the Bucket meta-data is ready to receive updates
+    */
+   private final StoppableCountDownLatch initializationLatchAfterBucketIntialization;
+ 
+   /**
+    * Constructor for a PartitionedRegion. This has an accessor (Region API)
+    * functionality and contains a datastore for actual storage. An accessor can
+    * act as a local cache by having a local storage enabled. A PartitionedRegion
+    * can be created by a factory method of RegionFactory.java and also by
+    * invoking Cache.createRegion(). (Cache.xml etc to be added)
+    * 
+    */
+ 
+   static public final String RETRY_TIMEOUT_PROPERTY = 
+       "gemfire.partitionedRegionRetryTimeout";
+   
+   private final PartitionRegionConfigValidator validator ;
+   
+   final List<FixedPartitionAttributesImpl> fixedPAttrs;
+   
+   private byte fixedPASet;
+   
+   public List<PartitionedRegion> colocatedByList= new CopyOnWriteArrayList<PartitionedRegion>();
+   
+   private final PartitionListener[] partitionListeners;
+ 
+   private boolean isShadowPR = false;
+   private boolean isShadowPRForHDFS = false;
+   
+   private AbstractGatewaySender parallelGatewaySender = null;
+   
+   private final ThreadLocal<Boolean> queryHDFS = new ThreadLocal<Boolean>() {
+     @Override
+     protected Boolean initialValue() {
+       return false;
+     }
+   };
+   
+   public PartitionedRegion(String regionname, RegionAttributes ra,
+       LocalRegion parentRegion, GemFireCacheImpl cache,
+       InternalRegionArguments internalRegionArgs) {
+     super(regionname, ra, parentRegion, cache, internalRegionArgs);
+ 
+     this.node = initializeNode();
+     this.prStats = new PartitionedRegionStats(cache.getDistributedSystem(), getFullPath());
+     this.regionIdentifier = getFullPath().replace('/', '#');
+ 
+     if (logger.isDebugEnabled()) {
+       logger.debug("Constructing Partitioned Region {}", regionname);
+     }
+ 
+     // By adding this disconnect listener we ensure that the pridmap is cleaned
+     // up upon
+     // distributed system disconnect even this (or other) PRs are destroyed
+     // (which prevents pridmap cleanup).
+     cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
+     
+     // add an async queue for the region if the store name is not null. 
+     if (this.getHDFSStoreName() != null) {
+       String eventQueueName = getHDFSEventQueueName();
+       super.addAsyncEventQueueId(eventQueueName);
+     }
+ 
+     // this.userScope = ra.getScope();
+     this.partitionAttributes = ra.getPartitionAttributes();
+     this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory();
+     this.retryTimeout = Integer.getInteger(RETRY_TIMEOUT_PROPERTY,
+         PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION).intValue();
+     this.totalNumberOfBuckets = this.partitionAttributes.getTotalNumBuckets();
+     this.prStats.incTotalNumBuckets(this.totalNumberOfBuckets);
+     this.distAdvisor = RegionAdvisor.createRegionAdvisor(this); // Warning: potential early escape of instance
+     this.redundancyProvider = new PRHARedundancyProvider(this); // Warning:
+                                                                 // potential
+                                                                 // early escape
+                                                                 // instance
+ 
+     // localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled();
+     // This is to make sure that local-cache get and put works properly.
+     // getScope is overridden to return the correct scope.
+     // this.scope = Scope.LOCAL;
+     this.redundantCopies = ra.getPartitionAttributes().getRedundantCopies();
+     this.prStats.setConfiguredRedundantCopies(ra.getPartitionAttributes().getRedundantCopies());
+     this.prStats.setLocalMaxMemory(ra.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024);
+     
+     // No redundancy required for writes
+     this.minimumWriteRedundancy = Integer.getInteger(
+         "gemfire.mimimumPartitionedRegionWriteRedundancy", 0).intValue();
+     // No redundancy required for reads
+     this.minimumReadRedundancy = Integer.getInteger(
+         "gemfire.mimimumPartitionedRegionReadRedundancy", 0).intValue();
+ 
+     this.haveCacheLoader = ra.getCacheLoader() != null;
+ 
+     this.initializationLatchAfterBucketIntialization = new StoppableCountDownLatch(
+         this.getCancelCriterion(), 1);
+     
+     this.validator = new PartitionRegionConfigValidator(this);
+     this.partitionListeners = this.partitionAttributes.getPartitionListeners(); 
+ 
+     this.colocatedWithRegion = ColocationHelper.getColocatedRegion(this);
+     if (colocatedWithRegion != null) {
+       //In colocation chain, child region inherita the fixed partitin attributes from parent region.
+       this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl();
+       this.fixedPASet = colocatedWithRegion.fixedPASet;
+       synchronized (colocatedWithRegion.colocatedByList) {
+         colocatedWithRegion.colocatedByList.add(this);
+       }
+     }
+     else {
+       this.fixedPAttrs = this.partitionAttributes.getFixedPartitionAttributes();
+       this.fixedPASet = 0;
+     }
+     if (logger.isDebugEnabled()) {
+       logger.debug("Partitioned Region {} constructed {}", regionname, (this.haveCacheLoader ? "with a cache loader" : ""));
+     }
+     if (this.getEvictionAttributes() != null
+         && this.getEvictionAttributes().getAlgorithm().isLRUHeap()) {
+       this.sortedBuckets = new ArrayList<BucketRegion>();
+       final ThreadGroup grp = LoggingThreadGroup.createThreadGroup("BucketSorterThread", logger);
+       ThreadFactory tf = new ThreadFactory() {
+         public Thread newThread(Runnable r) {
+           Thread t = new Thread(grp, r, "BucketSorterThread");
+           t.setDaemon(true);
+           return t;
+         }
+       };
+       this.bucketSorter = Executors.newScheduledThreadPool(1, tf);
+     }
+     // If eviction is on, Create an instance of PartitionedRegionLRUStatistics
+     if ((this.getEvictionAttributes() != null
+         && !this.getEvictionAttributes().getAlgorithm().isNone()
+         && this.getEvictionAttributes().getAction().isOverflowToDisk())
+         || this.getDataPolicy().withPersistence()) {
+       StatisticsFactory sf = this.getCache().getDistributedSystem();
+       this.diskRegionStats = new DiskRegionStats(sf, getFullPath());
+     } else {
+       this.diskRegionStats = null;
+     }
+     if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) {
+       this.isShadowPR = true;
+       this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender();
+       if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue())
+         this.isShadowPRForHDFS = true;
+     }
+     
+     
+     /*
+      * Start persistent profile logging if we are a persistent region.
+      */
+     if(dataPolicy.withPersistence()) {
+       startPersistenceProfileLogging();      
+     }
+   }
+ 
+   /**
+    * Monitors when other members that participate in this persistent region are removed and creates
+    * a log entry marking the event.
+    */
+   private void startPersistenceProfileLogging() {
+     this.distAdvisor.addProfileChangeListener(new ProfileListener() {
+       @Override
+       public void profileCreated(Profile profile) {
+       }
+ 
+       @Override
+       public void profileUpdated(Profile profile) {
+       }
+       
+       @Override
+       public void profileRemoved(Profile profile, boolean destroyed) {
+         /*
+          * Don't bother logging membership activity if our region isn't ready.
+          */
+         if(isInitialized()) {
+           CacheProfile cacheProfile = ((profile instanceof CacheProfile) ? (CacheProfile) profile : null);
+           Set<String> onlineMembers = new HashSet<String>();
+ 
+           TransformUtils.transform(PartitionedRegion.this.distAdvisor.advisePersistentMembers().values(),onlineMembers,TransformUtils.persistentMemberIdToLogEntryTransformer);
+ 
+           logger.info(LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_PERSISTENT_VIEW,
+               new Object[] {PartitionedRegion.this.getName(),TransformUtils.persistentMemberIdToLogEntryTransformer.transform(cacheProfile.persistentID),onlineMembers}));                          
+         }
+       }      
+     });
+   }
+ 
+   @Override
+   public final boolean isHDFSRegion() {
+     return this.getHDFSStoreName() != null;
+   }
+ 
+   @Override
+   public final boolean isHDFSReadWriteRegion() {
+     return isHDFSRegion() && !getHDFSWriteOnly();
+   }
+ 
+   @Override
+   protected final boolean isHDFSWriteOnly() {
+     return isHDFSRegion() && getHDFSWriteOnly();
+   }
+ 
+   public final void setQueryHDFS(boolean includeHDFS) {
+     queryHDFS.set(includeHDFS);
+   }
+ 
+   @Override
+   public final boolean includeHDFSResults() {
+     return queryHDFS.get();
+   }
+ 
+   public final boolean isShadowPR() {
+     return isShadowPR;
+   }
+ 
+   public final boolean isShadowPRForHDFS() {
+     return isShadowPRForHDFS;
+   }
+   
+   public AbstractGatewaySender getParallelGatewaySender() {
+     return parallelGatewaySender;
+   }
+   
+   public Set<String> getParallelGatewaySenderIds() {
+     Set<String> regionGatewaySenderIds = this.getAllGatewaySenderIds();
+     if (regionGatewaySenderIds.isEmpty()) {
+       return Collections.EMPTY_SET;
+     }
+     Set<GatewaySender> cacheGatewaySenders = getCache().getAllGatewaySenders();
+     Set<String> parallelGatewaySenderIds = new HashSet<String>();
+     for (GatewaySender sender : cacheGatewaySenders) {
+       if (regionGatewaySenderIds.contains(sender.getId())
+           && sender.isParallel()) {
+         parallelGatewaySenderIds.add(sender.getId());
+       }
+     }
+     return parallelGatewaySenderIds;
+   }
+   
+   List<PartitionedRegion> getColocatedByList() {
+ 	return this.colocatedByList;
+   }
+ 
+   public boolean isColocatedBy() {
+     return !this.colocatedByList.isEmpty();
+   } 
+ 
+   private void createAndValidatePersistentConfig() {
+     DiskStoreImpl dsi = this.getDiskStore();
+     if (this.dataPolicy.withPersistence() && !this.concurrencyChecksEnabled
+         && supportsConcurrencyChecks()) {
+       logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_ENABLING_CONCURRENCY_CHECKS_FOR_PERSISTENT_PR, this.getFullPath()));
+       this.concurrencyChecksEnabled = true;
+     }
+     if (dsi != null && this.getDataPolicy().withPersistence()) {
+       String colocatedWith = colocatedWithRegion == null 
+           ? "" : colocatedWithRegion.getFullPath(); 
+       PRPersistentConfig config = dsi.getPersistentPRConfig(this.getFullPath());
+       if(config != null) {
+         if (config.getTotalNumBuckets() != this.getTotalNumberOfBuckets()) {
+           Object[] prms = new Object[] { this.getFullPath(), this.getTotalNumberOfBuckets(),
+               config.getTotalNumBuckets() };
+           IllegalStateException ise = new IllegalStateException(
+               LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms));
+           throw ise;
+         }
+         //Make sure we don't change to be colocated with a different region
+         //We also can't change from colocated to not colocated without writing
+         //a record to disk, so we won't allow that right now either.
+         if (!colocatedWith.equals(config.getColocatedWith())) {
+           Object[] prms = new Object[] { this.getFullPath(), colocatedWith,
+               config.getColocatedWith() };
+           DiskAccessException dae = new DiskAccessException(LocalizedStrings.LocalRegion_A_DISKACCESSEXCEPTION_HAS_OCCURED_WHILE_WRITING_TO_THE_DISK_FOR_REGION_0_THE_REGION_WILL_BE_CLOSED.toLocalizedString(this.getFullPath()), null, dsi);
+           dsi.handleDiskAccessException(dae);
+           IllegalStateException ise = new IllegalStateException(
+               LocalizedStrings.PartitionedRegion_FOR_REGION_0_ColocatedWith_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms));
+           throw ise;
+         }
+       } else {
+         
+         config= new PRPersistentConfig(this.getTotalNumberOfBuckets(), 
+             colocatedWith);
+         dsi.addPersistentPR(this.getFullPath(), config);
+         //Fix for support issue 7870 - the parent region needs to be able
+         //to discover that there is a persistent colocated child region. So
+         //if this is a child region, persist its config to the parent disk store
+         //as well.
+         if(colocatedWithRegion != null 
+             && colocatedWithRegion.getDiskStore() != null
+             && colocatedWithRegion.getDiskStore() != dsi) {
+           colocatedWithRegion.getDiskStore().addPersistentPR(this.getFullPath(), config);
+         }
+       }
+       
+     }
+   }
+   
+   /**
+    * Initializes the PartitionedRegion meta data, adding this Node and starting
+    * the service on this node (if not already started).
+    * Made this synchronized for bug 41982
+    * @return true if initialize was done; false if not because it was destroyed
+    */
+   private synchronized boolean initPRInternals(InternalRegionArguments internalRegionArgs) {
+     
+     if (this.isLocallyDestroyed) {
+       // don't initialize if we are already destroyed for bug 41982
+       return false;
+     }
+     /* Initialize the PartitionRegion */
+     if (cache.isCacheAtShutdownAll()) {
+       throw new CacheClosedException("Cache is shutting down");
+     }
+     validator.validateColocation();
+     
+     //Do this after the validation, to avoid creating a persistent config
+     //for an invalid PR.
+     createAndValidatePersistentConfig();
+     initializePartitionedRegion();
+     
+     /* set the total number of buckets */
+     // setTotalNumOfBuckets();
+     // If localMaxMemory is set to 0, do not initialize Data Store.
+     final boolean storesData = this.localMaxMemory > 0;
+     if (storesData) {
+       initializeDataStore(this.getAttributes());
+     }
+ 
+     // register this PartitionedRegion, Create a PartitionRegionConfig and bind
+     // it into the allPartitionedRegion system wide Region.
+     // IMPORTANT: do this before advising peers that we have this region
+     registerPartitionedRegion(storesData);
+     
+     getRegionAdvisor().initializeRegionAdvisor(); // must be BEFORE initializeRegion call
+     getRegionAdvisor().addMembershipListener(this.advisorListener); // fix for bug 38719
+ 
+     // 3rd part of eviction attributes validation, after eviction attributes
+     // have potentially been published (by the first VM) but before buckets are created
+     validator.validateEvictionAttributesAgainstLocalMaxMemory();
+     validator.validateFixedPartitionAttributes();
+ 
+     // Register with the other Nodes that have this region defined, this
+     // allows for an Advisor profile exchange, also notifies the Admin
+     // callbacks that this Region is created.
+     try {
+       new CreateRegionProcessor(this).initializeRegion();
+     } catch (IllegalStateException e) {
+       // If this is a PARTITION_PROXY then retry region creation
+       // after toggling the concurrencyChecksEnabled flag. This is
+       // required because for persistent regions, we enforce concurrencyChecks
+       if (!this.isDataStore() && supportsConcurrencyChecks()) {
+         this.concurrencyChecksEnabled = !this.concurrencyChecksEnabled;
+         new CreateRegionProcessor(this).initializeRegion();
+       } else {
+         throw e;
+       }
+     }
+ 
+     if (!this.isDestroyed && !this.isLocallyDestroyed) {
+       // Register at this point so that other members are known
+       this.cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
+     }
+     
+     // Create OQL indexes before starting GII.
+     createOQLIndexes(internalRegionArgs);
+     
+     // if any other services are dependent on notifications from this region,
+     // then we need to make sure that in-process ops are distributed before
+     // releasing the GII latches
+     if (this.isAllEvents()) {
+       StateFlushOperation sfo = new StateFlushOperation(getDistributionManager());
+       try {
+         sfo.flush(this.distAdvisor.adviseAllPRNodes(),
+           getDistributionManager().getId(),
+           DistributionManager.HIGH_PRIORITY_EXECUTOR, false);
+       } catch (InterruptedException ie) {
+         Thread.currentThread().interrupt();
+         getCancelCriterion().checkCancelInProgress(ie);
+       }
+     }
+ 
+     releaseBeforeGetInitialImageLatch(); // moved to this spot for bug 36671
+ 
+     // requires prid assignment mthomas 4/3/2007
+     getRegionAdvisor().processProfilesQueuedDuringInitialization(); 
+ 
+     releaseAfterBucketMetadataSetupLatch();
+         
+     try {
+       if(storesData) {
+         if(this.redundancyProvider.recoverPersistentBuckets()) {
+           //Mark members as recovered from disk recursively, starting
+           //with the leader region.
+           PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(this);
+           markRecoveredRecursively(leaderRegion);
+         }
+       }
+     }
+     catch (RegionDestroyedException rde) {
+       // Do nothing.
+       if (logger.isDebugEnabled()) {
+         logger.debug("initPRInternals: failed due to exception", rde);
+       }
+     }
+ 
+     releaseAfterGetInitialImageLatch();
+ 
+     try {
+       if(storesData) {
+         this.redundancyProvider.scheduleCreateMissingBuckets();
+ 
+         if (this.redundantCopies > 0) {
+           this.redundancyProvider.startRedundancyRecovery();
+         }
+       }
+     }
+     catch (RegionDestroyedException rde) {
+       // Do nothing.
+       if (logger.isDebugEnabled()) {
+         logger.debug("initPRInternals: failed due to exception", rde);
+       }
+     }
+ 
+     return true;
+   }
+   
+   private void markRecoveredRecursively(PartitionedRegion region) {
+     region.setRecoveredFromDisk();
+     for(PartitionedRegion colocatedRegion : ColocationHelper.getColocatedChildRegions(region)) {
+       markRecoveredRecursively(colocatedRegion);
+     }
+   }
+ 
+   @Override
+   protected void postCreateRegion() {
+     super.postCreateRegion();
+     CacheListener[] listeners = fetchCacheListenersField();
+     if (listeners != null && listeners.length > 0) {
+       Set others = getRegionAdvisor().adviseGeneric();
+       for (int i = 0; i < listeners.length; i++) {
+         if (listeners[i] instanceof RegionMembershipListener) {
+           RegionMembershipListener rml = (RegionMembershipListener)listeners[i];
+           try {
+             DistributedMember[] otherDms = new DistributedMember[others
+                 .size()];
+             others.toArray(otherDms);
+             rml.initialMembers(this, otherDms);
+           }
+           catch (VirtualMachineError err) {
+             SystemFailure.initiateFailure(err);
+             // If this ever returns, rethrow the error.  We're poisoned
+             // now, so don't let this thread continue.
+             throw err;
+           }
+           catch (Throwable t) {
+             // Whenever you catch Error or Throwable, you must also
+             // catch VirtualMachineError (see above).  However, there is
+             // _still_ a possibility that you are dealing with a cascading
+             // error condition, so you also need to check to see if the JVM
+             // is still usable:
+             SystemFailure.checkFailure();
+             logger.error(LocalizedMessage.create(LocalizedStrings.DistributedRegion_EXCEPTION_OCCURRED_IN_REGIONMEMBERSHIPLISTENER), t);
+           }
+         }
+       }
+     }
+     
+     PartitionListener[] partitionListeners = this.getPartitionListeners();
+     if (partitionListeners != null && partitionListeners.length != 0) {
+       for (int i = 0; i < partitionListeners.length; i++) {
+         PartitionListener listener = partitionListeners[i];
+         if (listener != null) {
+           listener.afterRegionCreate(this);
+         }
+       }     
+     }
+ 
+     Set<String> allGatewaySenderIds = getAllGatewaySenderIds();
+     if (!allGatewaySenderIds.isEmpty()) {
+       for (GatewaySender sender : cache.getAllGatewaySenders()) {
+         if (sender.isParallel()
+             && allGatewaySenderIds.contains(sender.getId())) {
+           /**
+            * get the ParallelGatewaySender to create the colocated partitioned
+            * region for this region.
+            */
+           if (sender.isRunning() ) {
+             AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender;
+             ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(new RegionQueue[1])[0])
+                 .addShadowPartitionedRegionForUserPR(this);
+           }
+         }
+       }
+     }   
+   }
+ 
+   /*
+    * Initializes the PartitionedRegion. OVERRIDES
+    */
+   @Override
+   protected void initialize(InputStream snapshotInputStream,
+       InternalDistributedMember imageTarget,
+       InternalRegionArguments internalRegionArgs) throws TimeoutException,
+       ClassNotFoundException {
+     if (logger.isDebugEnabled()) {
+       logger.debug("PartitionedRegion#initialize {}", getName());
+     }
+     RegionLogger.logCreate(getName(), getDistributionManager().getDistributionManagerId());
+ 
+     this.requiresNotification = this.cache.requiresNotificationFromPR(this);
+     initPRInternals(internalRegionArgs);
+ 
+     if (logger.isDebugEnabled()) {
+       logger.debug("PartitionRegion#initialize: finished with {}", this);
+     }
+     this.cache.addPartitionedRegion(this);
+ 
+   }
+ 
+   /**
+    * Initializes the Node for this Map.
+    */
+   private Node initializeNode() {
+     return new Node(getDistributionManager().getId(),
+         SERIAL_NUMBER_GENERATOR.getAndIncrement());
+   }
+ 
+   /**
+    * receive notification that a bridge server or wan gateway has been created
+    * that requires notification of cache events from this region
+    */
+   public void cacheRequiresNotification() {
+     if (!this.requiresNotification
+         && !(this.isClosed || this.isLocallyDestroyed)) {
+       // tell others of the change in status
+       this.requiresNotification = true;
+       new UpdateAttributesProcessor(this).distribute(false);
+     }    
+   }
+   
+   @Override
+   void distributeUpdatedProfileOnHubCreation()
+   {
+     if (!(this.isClosed || this.isLocallyDestroyed)) {
+       // tell others of the change in status
+       this.requiresNotification = true;
+       new UpdateAttributesProcessor(this).distribute(false);      
+     }
+   }
+ 
+   @Override
+   void distributeUpdatedProfileOnSenderCreation()
+   {
+     if (!(this.isClosed || this.isLocallyDestroyed)) {
+       // tell others of the change in status
+       this.requiresNotification = true;
+       new UpdateAttributesProcessor(this).distribute(false);      
+     }
+   }
+   
+   public void addGatewaySenderId(String gatewaySenderId) {
+     super.addGatewaySenderId(gatewaySenderId);
+     new UpdateAttributesProcessor(this).distribute();
+     ((PartitionedRegion)this).distributeUpdatedProfileOnSenderCreation();
+     GatewaySender sender = getCache().getGatewaySender(gatewaySenderId);
+     if (sender!= null && sender.isParallel() && sender.isRunning()) {
+       AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender;
+       ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(
+           new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this);
+     }
+   }
+   
+   public void removeGatewaySenderId(String gatewaySenderId){
+     super.removeGatewaySenderId(gatewaySenderId);
+     new UpdateAttributesProcessor(this).distribute();
+   }
+   
+   public void addAsyncEventQueueId(String asyncEventQueueId) {
+     super.addAsyncEventQueueId(asyncEventQueueId);
+     new UpdateAttributesProcessor(this).distribute();
+     ((PartitionedRegion)this).distributeUpdatedProfileOnSenderCreation();
+     GatewaySender sender = getCache().getGatewaySender(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncEventQueueId));
+     if (sender!= null && sender.isParallel() && sender.isRunning()) {
+       AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender;
+       ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(
+           new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this);
+     }
+   }
+   
+   public void removeAsyncEventQueueId(String asyncEventQueueId) {
+     super.removeAsyncEventQueueId(asyncEventQueueId);
+     new UpdateAttributesProcessor(this).distribute();
+   }
+   
+   public void checkSameSenderIdsAvailableOnAllNodes() {
+     List senderIds = this.getCacheDistributionAdvisor()
+         .adviseSameGatewaySenderIds(getGatewaySenderIds());
+     if (!senderIds.isEmpty()) {
+       throw new GatewaySenderConfigurationException(
+           LocalizedStrings.Region_REGION_0_HAS_1_GATEWAY_SENDER_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_GATEWAY_SENDER_IDS_SHOULD_BE_SAME
+               .toLocalizedString(new Object[] { this.getName(),
+                   senderIds.get(0), senderIds.get(1) }));
+     }
+ 
+     List asycnQueueIds = this.getCacheDistributionAdvisor()
+         .adviseSameAsyncEventQueueIds(getAsyncEventQueueIds());
+     if (!asycnQueueIds.isEmpty()) {
+       throw new GatewaySenderConfigurationException(
+           LocalizedStrings.Region_REGION_0_HAS_1_ASYNC_EVENT_QUEUE_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_ASYNC_EVENT_QUEUE_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_ASYNC_EVENT_QUEUE_IDS_SHOULD_BE_SAME
+               .toLocalizedString(new Object[] { this.getName(),
+                   asycnQueueIds.get(0), asycnQueueIds.get(1) }));
+     }
+   }
+   
+   /**
+    * Initializes the PartitionedRegion - create the Global regions for storing
+    * the PartitiotnedRegion configs.
+    */
+   private void initializePartitionedRegion() {
+     this.prRoot = PartitionedRegionHelper.getPRRoot(getCache());
+   }
+ 
+   @Override
+   public void remoteRegionInitialized(CacheProfile profile) {
+     if (isInitialized() && hasListener()) {
+       Object callback = DistributedRegion.TEST_HOOK_ADD_PROFILE? profile : null;
+       RegionEventImpl event = new RegionEventImpl(PartitionedRegion.this,
+           Operation.REGION_CREATE, callback, true, profile.peerMemberId);
+       dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CREATE,
+             event);
+     }
+   }
+ 
+   /**
+    * This method initializes the partitionedRegionDataStore for this PR.
+    * 
+    * @param ra
+    *                Region attributes
+    */
+   private void initializeDataStore(RegionAttributes ra) {
+ 
+     this.dataStore = PartitionedRegionDataStore.createDataStore(cache, this, ra
+         .getPartitionAttributes());
+   }
+ 
+   protected DistributedLockService getPartitionedRegionLockService() {
+     return getGemFireCache().getPartitionedRegionLockService();
+   }
+   
+   /**
+    * Register this PartitionedRegion by: 1) Create a PartitionRegionConfig and
+    * 2) Bind it into the allPartitionedRegion system wide Region.
+    * 
+    * @param storesData
+    *                which indicates whether the instance in this cache stores
+    *                data, effecting the Nodes PRType
+    * 
+    * @see Node#setPRType(int)
+    */
+   private void registerPartitionedRegion(boolean storesData) {
+     // Register this ParitionedRegion. First check if the ParitionedRegion
+     // entry already exists globally.
+     PartitionRegionConfig prConfig = null;
+     PartitionAttributes prAttribs = getAttributes().getPartitionAttributes();
+     if (storesData) {
+       if (this.fixedPAttrs != null) {
+         this.node.setPRType(Node.FIXED_PR_DATASTORE);
+       } else {
+         this.node.setPRType(Node.ACCESSOR_DATASTORE);
+       }
+       this.node.setPersistence(getAttributes().getDataPolicy() == DataPolicy.PERSISTENT_PARTITION);
+       byte loaderByte = (byte)(getAttributes().getCacheLoader() != null ? 0x01 : 0x00);
+       byte writerByte = (byte)(getAttributes().getCacheWriter() != null ? 0x02 : 0x00);
+       this.node.setLoaderWriterByte((byte)(loaderByte + writerByte));
+     }
+     else {
+       if (this.fixedPAttrs != null) {
+         this.node.setPRType(Node.FIXED_PR_ACCESSOR);
+       } else {
+         this.node.setPRType(Node.ACCESSOR);
+       }
+     }
+     final RegionLock rl = getRegionLock();
+     try {
+       // if (!rl.lock()) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("registerPartitionedRegion: obtaining lock");
+       }
+       rl.lock();
+       checkReadiness();
+       
+       prConfig = this.prRoot.get(getRegionIdentifier());
+       
+       if (prConfig == null) {
+         validateParalleGatewaySenderIds();
+         this.partitionedRegionId = generatePRId(getSystem());
+         prConfig = new PartitionRegionConfig(this.partitionedRegionId,
+             this.getFullPath(), prAttribs, this.getScope(),
+             getAttributes().getEvictionAttributes(), 
+             getAttributes().getRegionIdleTimeout(), 
+             getAttributes().getRegionTimeToLive(), 
+             getAttributes().getEntryIdleTimeout(),
+             getAttributes().getEntryTimeToLive(),
+             this.getAllGatewaySenderIds());
+         logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_BORN_WITH_PRID_1_IDENT_2,
+               new Object[] { getFullPath(), Integer.valueOf(this.partitionedRegionId), getRegionIdentifier()}));
+ 
+         PRSanityCheckMessage.schedule(this);
+       }
+       else {
+         validator.validatePartitionAttrsFromPRConfig(prConfig);
+         if (storesData) {
+           validator.validatePersistentMatchBetweenDataStores(prConfig);
+           validator.validateCacheLoaderWriterBetweenDataStores(prConfig);
+           validator.validateFixedPABetweenDataStores(prConfig);
+         }
+         
+         this.partitionedRegionId = prConfig.getPRId();
+         logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_CREATED_WITH_PRID_1,
+               new Object[] { getFullPath(), Integer.valueOf(this.partitionedRegionId)}));
+       }
+ 
+       synchronized (prIdToPR) {
+         prIdToPR.put(Integer.valueOf(this.partitionedRegionId), this); // last
+       }
+       prConfig.addNode(this.node);
+       if (this.getFixedPartitionAttributesImpl() != null) {
+         calculateStartingBucketIDs(prConfig);
+       }
+       updatePRConfig(prConfig, false);
+       /*
+        * try { if (this.redundantCopies > 0) { if (storesData) {
+        * this.dataStore.grabBackupBuckets(false); } } } catch
+        * (RegionDestroyedException rde) { if (!this.isClosed) throw rde; }
+        */
+       this.cleanPRRegistration = true;
+     }
+     catch (LockServiceDestroyedException lsde) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("registerPartitionedRegion: unable to obtain lock for {}", this);
+       }
+       cleanupFailedInitialization();
+       throw new PartitionedRegionException(
+           LocalizedStrings.PartitionedRegion_CAN_NOT_CREATE_PARTITIONEDREGION_FAILED_TO_ACQUIRE_REGIONLOCK
+               .toLocalizedString(), lsde);
+     }
+     catch (IllegalStateException ill) {
+       cleanupFailedInitialization();
+       throw ill;
+     }
+     catch (VirtualMachineError err) {
+       SystemFailure.initiateFailure(err);
+       // If this ever returns, rethrow the error.  We're poisoned
+       // now, so don't let this thread continue.
+       throw err;
+     }
+     catch (Throwable t) {
+       // Whenever you catch Error or Throwable, you must also
+       // catch VirtualMachineError (see above).  However, there is
+       // _still_ a possibility that you are dealing with a cascading
+       // error condition, so you also need to check to see if the JVM
+       // is still usable:
+       SystemFailure.checkFailure();
+       String registerErrMsg = 
+         LocalizedStrings.PartitionedRegion_AN_EXCEPTION_WAS_CAUGHT_WHILE_REGISTERING_PARTITIONEDREGION_0_DUMPPRID_1
+         .toLocalizedString(new Object[] {getFullPath(), prIdToPR.dump()});
+       try {
+         synchronized (prIdToPR) {
+           if (prIdToPR.containsKey(Integer.valueOf(this.partitionedRegionId))) {
+             prIdToPR.put(Integer.valueOf(this.partitionedRegionId),
+                 PRIdMap.FAILED_REGISTRATION, false);
+             logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_FAILED_REGISTRATION_PRID_0_NAMED_1,
+                   new Object[] {Integer.valueOf(this.partitionedRegionId), this.getName()}));
+           }
+         }
+       }
+       catch (VirtualMachineError err) {
+         SystemFailure.initiateFailure(err);
+         // If this ever returns, rethrow the error.  We're poisoned
+         // now, so don't let this thread continue.
+         throw err;
+       }
+       catch (Throwable ignore) {
+         // Whenever you catch Error or Throwable, you must also
+         // catch VirtualMachineError (see above).  However, there is
+         // _still_ a possibility that you are dealing with a cascading
+         // error condition, so you also need to check to see if the JVM
+         // is still usable:
+         SystemFailure.checkFailure();
+         if (logger.isDebugEnabled()) {
+           logger.debug("Partitioned Region creation, could not clean up after caught exception", ignore);
+         }
+       }
+       throw new PartitionedRegionException(registerErrMsg, t);
+     }
+     finally {
+       try {
+         rl.unlock();
+         if (logger.isDebugEnabled()) {
+           logger.debug("registerPartitionedRegion: released lock");
+         }
+       }
+       catch (Exception es) {
+         if (logger.isDebugEnabled()) {
+           logger.warn(es.getMessage(), es);
+         }
+       }
+     }
+   }
+ 
+   public void validateParalleGatewaySenderIds() throws PRLocallyDestroyedException{
+     for (String senderId : this.getParallelGatewaySenderIds()) {
+       for (PartitionRegionConfig config : this.prRoot.values()) {
+         if (config.getGatewaySenderIds().contains(senderId)) {
+           Map<String, PartitionedRegion> colocationMap = ColocationHelper
+               .getAllColocationRegions(this);
+           if (!colocationMap.isEmpty()) {
+             if (colocationMap.containsKey(config.getFullPath())) {
+               continue;
+             }
+             else {
+               int prID = config.getPRId();
+               PartitionedRegion colocatedPR = PartitionedRegion
+                   .getPRFromId(prID);
+               PartitionedRegion leader = ColocationHelper
+                   .getLeaderRegion(colocatedPR);
+               if (colocationMap.containsValue(leader)) {
+                 continue;
+               }
+               else {
+                 throw new IllegalStateException(
+                     LocalizedStrings.PartitionRegion_NON_COLOCATED_REGIONS_1_2_CANNOT_HAVE_SAME_PARALLEL_GATEWAY_SENDER_ID_2.toString(new Object[] {
+                         this.getFullPath(),
+                         config.getFullPath(),
+                         senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX) ? "async event queue": "gateway sender",
+                         senderId }));
+               }
+             }
+           }
+           else {
+             throw new IllegalStateException(
+                 LocalizedStrings.PartitionRegion_NON_COLOCATED_REGIONS_1_2_CANNOT_HAVE_SAME_PARALLEL_GATEWAY_SENDER_ID_2.toString(new Object[] {
+                     this.getFullPath(),
+                     config.getFullPath(),
+                     senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX) ? "async event queue": "gateway sender",
+                     senderId }));
+           }
+ 
+         }
+       }
+     }
+   }
+ 
+   /**
+    * @return whether this region requires event notification for all cache
+    *         content changes from other nodes
+    */
+   public boolean getRequiresNotification() {
+     return this.requiresNotification;
+   }
+ 
+   /**
+    * Get the Partitioned Region identifier used for DLocks (Bucket and Region)
+    */
+   final public String getRegionIdentifier() {
+     return this.regionIdentifier;
+   }
+   
+   void setRecoveredFromDisk() {
+     this.recoveredFromDisk = true;
+     new UpdateAttributesProcessor(this).distribute(false);
+   }
+ 
+   public final void updatePRConfig(PartitionRegionConfig prConfig,
+       boolean putOnlyIfUpdated) {
+     final Set<Node> nodes = prConfig.getNodes();
+     final PartitionedRegion colocatedRegion = ColocationHelper
+         .getColocatedRegion(this);
+     RegionLock colocatedLock = null;
+     boolean colocatedLockAcquired = false;
+     try {
+       boolean colocationComplete = false;
+       if (colocatedRegion != null && !prConfig.isColocationComplete() &&
+         // if the current node is marked uninitialized (SQLF DDL replay in
+         // progress) then colocation will definitely not be marked complete so
+         // avoid taking the expensive region lock
+           !getCache().isUnInitializedMember(getDistributionManager().getId())) {
+         colocatedLock = colocatedRegion.getRegionLock();
+         colocatedLock.lock();
+         colocatedLockAcquired = true;
+         final PartitionRegionConfig parentConf = this.prRoot
+             .get(colocatedRegion.getRegionIdentifier());
+         if (parentConf.isColocationComplete()
+             && parentConf.hasSameDataStoreMembers(prConfig)) {
+           colocationComplete = true;
+           // check if all the nodes have been initialized (SQLF bug #42089)
+           for (Node node : nodes) {
+             if (getCache().isUnInitializedMember(node.getMemberId())) {
+               colocationComplete = false;
+               break;
+             }
+           }
+           if (colocationComplete) {
+             prConfig.setColocationComplete();
+           }
+         }
+       }
+ 
+       if(isDataStore() && !prConfig.isFirstDataStoreCreated()) {
+         prConfig.setDatastoreCreated(getEvictionAttributes());
+       }
+       // N.B.: this put could fail with a CacheClosedException:
+       if (!putOnlyIfUpdated || colocationComplete) {
+         this.prRoot.put(getRegionIdentifier(), prConfig);
+       }
+     } finally {
+       if (colocatedLockAcquired) {
+         colocatedLock.unlock();
+       }
+     }
+   }
+ 
+   /**
+    * 
+    * @param keyInfo
+    * @param access
+    *          true if caller wants last accessed time updated
+    * @param allowTombstones - whether a tombstone can be returned
+    * @return TODO
+    */
+   @Override
+   protected Region.Entry<?, ?> nonTXGetEntry(KeyInfo keyInfo, boolean access, boolean allowTombstones) {
+     final long startTime = PartitionedRegionStats.startTime();
+     final Object key = keyInfo.getKey();
+     try {
+       int bucketId = keyInfo.getBucketId();
+       if (bucketId == KeyInfo.UNKNOWN_BUCKET) {
+         bucketId = PartitionedRegionHelper.getHashKey(this,
+             Operation.GET_ENTRY, key, null, null);
+         keyInfo.setBucketId(bucketId);
+       }
+       InternalDistributedMember targetNode = getOrCreateNodeForBucketRead(bucketId);
+       return getEntryInBucket(targetNode, bucketId, key, access, allowTombstones);
+     }
+     finally {
+       this.prStats.endGetEntry(startTime);
+     }
+   }
+ 
+   protected EntrySnapshot getEntryInBucket(
+       final DistributedMember targetNode, final int bucketId,
+       final Object key, boolean access, final boolean allowTombstones) {
+     final int retryAttempts = calcRetry();
+     if (logger.isTraceEnabled()) {
+       logger.trace("getEntryInBucket: " + "Key key={} ({}) from: {} bucketId={}",
+           key, key.hashCode(), targetNode, bucketStringForLogs(bucketId));
+     }
+     Integer bucketIdInt = Integer.valueOf(bucketId);
+     EntrySnapshot ret = null;
+     int count = 0;
+     RetryTimeKeeper retryTime = null;
+     InternalDistributedMember retryNode = (InternalDistributedMember)targetNode;
+     while (count <= retryAttempts) {
+       // Every continuation should check for DM cancellation
+       if (retryNode == null) {
+         checkReadiness();
+         if (retryTime == null) {
+           retryTime = new RetryTimeKeeper(this.retryTimeout);
+         }
+         if (retryTime.overMaximum()) {
+           break;
+         }
+         retryNode = getOrCreateNodeForBucketRead(bucketId);
+ 
+         // No storage found for bucket, early out preventing hot loop, bug 36819
+         if (retryNode == null) {
+           checkShutdown();
+           return null;
+         }
+         continue;
+       }
+       try {
+         final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId());
+         if (loc) {
+           ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true);
+         } else {
+           ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones);
+           // TODO:Suranjan&Yogesh : there should be better way than this one
+           String name = Thread.currentThread().getName();
+           if (name.startsWith("ServerConnection")
+               && !getMyId().equals(targetNode)) {
+             setNetworkHop(bucketIdInt, (InternalDistributedMember)targetNode);
+           }
+         }
+         
+         return ret;
+       }
+       catch (PRLocallyDestroyedException pde) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException ");
+         }
+         checkReadiness();
+       }
+       catch (EntryNotFoundException enfe) {
+         return null;
+       }
+       catch (ForceReattemptException prce) {
+         prce.checkKey(key);
+         if (logger.isDebugEnabled()) {
+           logger.debug("getEntryInBucket: retrying, attempts so far: {}", count, prce);
+         }
+         checkReadiness();
+         InternalDistributedMember lastNode = retryNode;
+         retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue());
+         if (lastNode.equals(retryNode)) {
+           if (retryTime == null) {
+             retryTime = new RetryTimeKeeper(this.retryTimeout);
+           }
+           if (retryTime.overMaximum()) {
+             break;
+           }
+           retryTime.waitToRetryNode();
+         }
+       }
+       catch (PrimaryBucketException notPrimary) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), retryNode);
+         }
+         getRegionAdvisor().notPrimary(bucketIdInt.intValue(), retryNode);
+         retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue());
+       }
+ 
+       // It's possible this is a GemFire thread e.g. ServerConnection
+       // which got to this point because of a distributed system shutdown or
+       // region closure which uses interrupt to break any sleep() or wait()
+       // calls
+       // e.g. waitForPrimary
+       checkShutdown();
+ 
+       count++;
+       if (count == 1) {
+         this.prStats.incContainsKeyValueOpsRetried();
+       }
+       this.prStats.incContainsKeyValueRetries();
+ 
+     }
+ 
+     PartitionedRegionDistributionException e = null; // Fix for bug 36014
+     if (logger.isDebugEnabled()) {
+       e = new PartitionedRegionDistributionException(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS.toLocalizedString(Integer.valueOf(count)));
+     }
+     logger.warn(LocalizedMessage.create(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS, Integer.valueOf(count)), e);
+     return null;
+   }
+ 
+   /**
+    * Check for region closure, region destruction, cache closure as well as
+    * distributed system disconnect. As of 6/21/2007, there were at least four
+    * volatile variables reads and one synchonrization performed upon completion
+    * of this method.
+    */
+   private void checkShutdown() {
+     checkReadiness();
+     this.cache.getCancelCriterion().checkCancelInProgress(null);
+   }
+ 
+   /**
+    * Checks if a key is contained remotely.
+    * 
+    * @param targetNode
+    *          the node where bucket region for the key exists.
+    * @param bucketId
+    *          the bucket id for the key.
+    * @param key
+    *          the key, whose value needs to be checks
+    * @param access
+    *          true if caller wants last access time updated
+    * @param allowTombstones whether tombstones should be returned
+    * @throws EntryNotFoundException
+    *           if the entry doesn't exist
+    * @throws ForceReattemptException
+    *           if the peer is no longer available
+    * @throws PrimaryBucketException
+    * @return true if the passed key is contained remotely.
+    */
+   public EntrySnapshot getEntryRemotely(
+       InternalDistributedMember targetNode,
+       Integer bucketId, Object key, boolean access, boolean allowTombstones)
+       throws EntryNotFoundException, PrimaryBucketException,
+       ForceReattemptException {
+     FetchEntryResponse r = FetchEntryMessage
+         .send(targetNode, this, key, access);
+     this.prStats.incPartitionMessagesSent();
+     EntrySnapshot entry = r.waitForResponse();
+     if (entry != null && entry.getRawValue() == Token.TOMBSTONE){
+       if (!allowTombstones) {
+         return null;
+   }
+     }
+     return entry;
+   }
+ 
+   // /////////////////////////////////////////////////////////////////
+   // Following methods would throw, operation Not Supported Exception
+   // /////////////////////////////////////////////////////////////////
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public void becomeLockGrantor() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   final public Region createSubregion(String subregionName,
+       RegionAttributes regionAttributes) throws RegionExistsException,
+       TimeoutException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public Lock getDistributedLock(Object key) throws IllegalStateException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public CacheStatistics getStatistics() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   public Region getSubregion() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public Lock getRegionDistributedLock() throws IllegalStateException {
+ 
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public void loadSnapshot(InputStream inputStream) throws IOException,
+       ClassNotFoundException, CacheWriterException, TimeoutException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * Should it destroy entry from local accessor????? 
+    * OVERRIDES
+    */
+   @Override
+   public void localDestroy(Object key, Object aCallbackArgument)
+       throws EntryNotFoundException {
+ 
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public void localInvalidate(Object key, Object aCallbackArgument)
+       throws EntryNotFoundException {
+ 
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public void localInvalidateRegion(Object aCallbackArgument) {
+     getDataView().checkSupportsRegionInvalidate();
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * Executes a query on this PartitionedRegion. The restrictions have already
+    * been checked. The query is a SELECT expression, and the only region it
+    * refers to is this region.
+    * 
+    * @see DefaultQuery#execute()
+    * 
+    * @since 5.1
+    */
+   public Object executeQuery(DefaultQuery query, Object[] parameters,
+       Set buckets) throws FunctionDomainException, TypeMismatchException,
+       NameResolutionException, QueryInvocationTargetException {
+     for (;;) {
+       try {
+         return doExecuteQuery(query, parameters, buckets);
+       } catch (ForceReattemptException fre) {
+         // fall through and loop
+       }
+     }
+   }
+   /**
+    * If ForceReattemptException is thrown then the caller must loop and call us again.
+    * @throws ForceReattemptException if one of the buckets moved out from under us
+    */
+   private Object doExecuteQuery(DefaultQuery query, Object[] parameters,
+       Set buckets)
+   throws FunctionDomainException, TypeMismatchException,
+   NameResolutionException, QueryInvocationTargetException,
+   ForceReattemptException
+   {
+     if (logger.isDebugEnabled()) {
+       logger.debug("Executing query :{}", query);
+     }
+ 
+     HashSet<Integer> allBuckets = new HashSet<Integer>();
+     
+     if (buckets==null) { // remote buckets
+       final Iterator remoteIter = getRegionAdvisor().getBucketSet().iterator();
+       try {
+         while (remoteIter.hasNext()) {
+           allBuckets.add((Integer)remoteIter.next());
+         }
+       }
+       catch (NoSuchElementException stop) {
+       }
+     }
+     else { // local buckets
+       Iterator localIter = null;
+       if (this.dataStore != null) {
+         localIter = buckets.iterator();
+       }
+       else {
+         localIter = Collections.EMPTY_SET.iterator();
+       }
+       try {
+         while (localIter.hasNext()) {
+           allBuckets.add((Integer)localIter.next());        
+         }
+       }
+       catch (NoSuchElementException stop) {
+       }
+     }
+ 
+     if (allBuckets.size() == 0) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("No bucket storage allocated. PR has no data yet.");
+       }
+       ResultsSet resSet = new ResultsSet();
+       resSet.setElementType(new ObjectTypeImpl(
+           this.getValueConstraint() == null ? Object.class : this
+               .getValueConstraint()));
+       return resSet;
+     }
+ 
+     CompiledSelect selectExpr = query.getSimpleSelect();
+     if (selectExpr == null) {
+       throw new IllegalArgumentException(
+         LocalizedStrings.
+           PartitionedRegion_QUERY_MUST_BE_A_SELECT_EXPRESSION_ONLY
+             .toLocalizedString());
+     }    
+ 
+     // this can return a BAG even if it's a DISTINCT select expression,
+     // since the expectation is that the duplicates will be removed at the end
+     SelectResults results = selectExpr
+         .getEmptyResultSet(parameters, getCache(), query);
+ 
+     PartitionedRegionQueryEvaluator prqe = new PartitionedRegionQueryEvaluator(this.getSystem(), this, query,
+         parameters, results, allBuckets);
+     for (;;) {
+       this.getCancelCriterion().checkCancelInProgress(null);
+       boolean interrupted = Thread.interrupted();
+       try {
+         results = prqe.queryBuckets(null);
+         break;
+       }
+       catch (InterruptedException e) {
+         interrupted = true;
+       }
+       catch (FunctionDomainException e) {
+ 	throw e;
+       }
+       catch (TypeMismatchException e) {
+ 	throw e;
+       }
+       catch (NameResolutionException e) {
+ 	throw e;
+       }
+       catch (QueryInvocationTargetException e) {
+ 	throw e;
+       }
+       catch (QueryException qe) {
+         throw new QueryInvocationTargetException(LocalizedStrings.PartitionedRegion_UNEXPECTED_QUERY_EXCEPTION_OCCURED_DURING_QUERY_EXECUTION_0.toLocalizedString(qe.getMessage()), qe);
+       }
+       finally {
+         if (interrupted) {
+           Thread.currentThread().interrupt();
+         }
+       }
+     } // for
+ 
+     // Drop Duplicates if this is a DISTINCT query
+     boolean allowsDuplicates = results.getCollectionType().allowsDuplicates();
+     //Asif: No need to apply the limit to the SelectResults. 
+     // We know that even if we do not apply the limit,
+     //the results will satisfy the limit
+     // as it has been evaluated in the iteration of List to 
+     // populate the SelectsResuts     
+     //So if the results is instance of ResultsBag or is a StructSet or 
+     // a ResultsSet, if the limit exists, the data set size will 
+     // be exactly matching the limit
+     if (selectExpr.isDistinct()) {
+       // don't just convert to a ResultsSet (or StructSet), since
+       // the bags can convert themselves to a Set more efficiently
+       ObjectType elementType = results.getCollectionType().getElementType();
+       if (selectExpr.getOrderByAttrs() != null) {
+         // Set limit also, its not applied while building the final result set as order by is involved.
+        // results = new ResultsCollectionWrapper(elementType, results.asSet(), query.getLimit(parameters));
+       } else if (allowsDuplicates) {
+         results = new ResultsCollectionWrapper(elementType, results.asSet());
+       }
+       if (selectExpr.isCount() && (results.isEmpty() || selectExpr.isDistinct())) {
+         SelectResults resultCount = new ResultsBag(getCachePerfStats());//Constructor with elementType not visible.
+         resultCount.setElementType(new ObjectTypeImpl(Integer.class));
+         ((ResultsBag)resultCount).addAndGetOccurence(results.size());
+         return resultCount;
+       }
+     }
+     return results;
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public void saveSnapshot(OutputStream outputStream) throws IOException {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+   @Override
+   public void writeToDisk() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /**
+    * @since 5.0
+    * @throws UnsupportedOperationException
+    * OVERRIDES
+    */
+  @Override
+  public void clear() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   void basicLocalClear(RegionEventImpl event) {
+     throw new UnsupportedOperationException();
+   }
+ 
+   // /////////////////////////////////////////////////////////////////////
+   // ////////////// Operation Supported for this release
+   // //////////////////////////////
+   // /////////////////////////////////////////////////////////////////////
+ 
+   @Override
+   boolean virtualPut(EntryEventImpl event,
+                      boolean ifNew,
+                      boolean ifOld,
+                      Object expectedOldValue,
+                      boolean requireOldValue,
+                      long lastModified,
+                      boolean overwriteDestroyed)
+   throws TimeoutException, CacheWriterException {
+     final long startTime = PartitionedRegionStats.startTime();
+     boolean result = false;
+     final DistributedPutAllOperation putAllOp_save = event.setPutAllOperation(null);
+     
+     if (event.getEventId() == null) {
+       event.setNewEventId(this.cache.getDistributedSystem());
+     }
+     boolean bucketStorageAssigned = true;
+     try {
+       final Integer bucketId = event.getKeyInfo().getBucketId();
+       assert bucketId != KeyInfo.UNKNOWN_BUCKET;
+       // check in bucket2Node region
+       InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId
+           .intValue(), null);
+       // force all values to be serialized early to make size computation cheap
+       // and to optimize distribution.
+       if (logger.isDebugEnabled()) {
+         logger.debug("PR.virtualPut putting event={}", event);
+       }
+ 
+       if (targetNode == null) {
+         try {
+           bucketStorageAssigned=false;
+           // if this is a Delta update, then throw exception since the key doesn't
+           // exist if there is no bucket for it yet
+           // For HDFS region, we will recover key, so allow bucket creation
+           if (!this.dataPolicy.withHDFS() && event.hasDelta()) {
+             throw new EntryNotFoundException(LocalizedStrings.
+               PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY
+                 .toLocalizedString());
+           }
+           targetNode = createBucket(bucketId.intValue(), event.getNewValSizeForPR(),
+               null);
+         }
+         catch (PartitionedRegionStorageException e) {
+           // try not to throw a PRSE if the cache is closing or this region was
+           // destroyed during createBucket() (bug 36574)
+           this.checkReadiness();
+           if (this.cache.isClosed()) {
+             throw new RegionDestroyedException(toString(), getFullPath());
+           }
+           throw e;
+         }
+       }
+ 
+       if (event.isBridgeEvent() && bucketStorageAssigned) {
+         setNetworkHop(bucketId, targetNode);
+       }
+       if (putAllOp_save == null) {
+         result = putInBucket(targetNode,
+                            bucketId,
+                            event,
+                            ifNew,
+                            ifOld,
+                            expectedOldValue,

<TRUNCATED>