You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:33 UTC
[71/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 0000000,b145a91..541c453
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@@ -1,0 -1,11398 +1,11398 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package com.gemstone.gemfire.internal.cache;
+
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Hashtable;
+ import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Random;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.FutureTask;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.locks.Lock;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.InternalGemFireException;
+ import com.gemstone.gemfire.StatisticsFactory;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.cache.AttributesFactory;
+ import com.gemstone.gemfire.cache.AttributesMutator;
+ import com.gemstone.gemfire.cache.Cache;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.CacheException;
+ import com.gemstone.gemfire.cache.CacheListener;
+ import com.gemstone.gemfire.cache.CacheLoader;
+ import com.gemstone.gemfire.cache.CacheLoaderException;
+ import com.gemstone.gemfire.cache.CacheStatistics;
+ import com.gemstone.gemfire.cache.CacheWriter;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.CustomExpiry;
+ import com.gemstone.gemfire.cache.DataPolicy;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.cache.EntryExistsException;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.ExpirationAttributes;
+ import com.gemstone.gemfire.cache.InterestPolicy;
+ import com.gemstone.gemfire.cache.InterestRegistrationEvent;
+ import com.gemstone.gemfire.cache.LoaderHelper;
+ import com.gemstone.gemfire.cache.LowMemoryException;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.PartitionAttributes;
+ import com.gemstone.gemfire.cache.PartitionResolver;
+ import com.gemstone.gemfire.cache.PartitionedRegionDistributionException;
+ import com.gemstone.gemfire.cache.PartitionedRegionStorageException;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionAttributes;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.RegionEvent;
+ import com.gemstone.gemfire.cache.RegionExistsException;
+ import com.gemstone.gemfire.cache.RegionMembershipListener;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
+ import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
+ import com.gemstone.gemfire.cache.TransactionException;
+ import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+ import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+ import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException;
+ import com.gemstone.gemfire.cache.execute.Function;
+ import com.gemstone.gemfire.cache.execute.FunctionContext;
+ import com.gemstone.gemfire.cache.execute.FunctionException;
+ import com.gemstone.gemfire.cache.execute.FunctionService;
+ import com.gemstone.gemfire.cache.execute.ResultCollector;
+ import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+ import com.gemstone.gemfire.cache.partition.PartitionListener;
+ import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
+ import com.gemstone.gemfire.cache.query.FunctionDomainException;
+ import com.gemstone.gemfire.cache.query.Index;
+ import com.gemstone.gemfire.cache.query.IndexCreationException;
+ import com.gemstone.gemfire.cache.query.IndexExistsException;
+ import com.gemstone.gemfire.cache.query.IndexInvalidException;
+ import com.gemstone.gemfire.cache.query.IndexNameConflictException;
+ import com.gemstone.gemfire.cache.query.IndexType;
+ import com.gemstone.gemfire.cache.query.MultiIndexCreationException;
+ import com.gemstone.gemfire.cache.query.NameResolutionException;
+ import com.gemstone.gemfire.cache.query.QueryException;
+ import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+ import com.gemstone.gemfire.cache.query.SelectResults;
+ import com.gemstone.gemfire.cache.query.TypeMismatchException;
+ import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
+ import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+ import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
+ import com.gemstone.gemfire.cache.query.internal.QCompiler;
+ import com.gemstone.gemfire.cache.query.internal.QueryExecutor;
+ import com.gemstone.gemfire.cache.query.internal.ResultsBag;
+ import com.gemstone.gemfire.cache.query.internal.ResultsCollectionWrapper;
+ import com.gemstone.gemfire.cache.query.internal.ResultsSet;
+ import com.gemstone.gemfire.cache.query.internal.index.AbstractIndex;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
+ import com.gemstone.gemfire.cache.query.internal.index.PartitionedIndex;
+ import com.gemstone.gemfire.cache.query.internal.types.ObjectTypeImpl;
+ import com.gemstone.gemfire.cache.query.types.ObjectType;
+ import com.gemstone.gemfire.cache.wan.GatewaySender;
+ import com.gemstone.gemfire.distributed.DistributedLockService;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
+ import com.gemstone.gemfire.distributed.internal.DM;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+ import com.gemstone.gemfire.distributed.internal.DistributionManager;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem.DisconnectListener;
+ import com.gemstone.gemfire.distributed.internal.MembershipListener;
+ import com.gemstone.gemfire.distributed.internal.ProfileListener;
+ import com.gemstone.gemfire.distributed.internal.ReplyException;
+ import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+ import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
+ import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.NanoTimer;
+ import com.gemstone.gemfire.internal.SetUtils;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.BucketAdvisor.ServerBucketProfile;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
+ import com.gemstone.gemfire.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse;
+ import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+ import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+ import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+ import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
+ import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+ import com.gemstone.gemfire.internal.cache.execute.FunctionExecutionNodePruner;
+ import com.gemstone.gemfire.internal.cache.execute.FunctionRemoteContext;
+ import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
+ import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+ import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionExecutor;
+ import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.execute.PartitionedRegionFunctionResultWaiter;
+ import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
+ import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+ import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
+ import com.gemstone.gemfire.internal.cache.lru.LRUStatistics;
+ import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage.DestroyResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.DestroyRegionOnDataStoreMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpAllPRConfigMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.DumpBucketsMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchBulkEntriesMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchBulkEntriesMessage.FetchBulkEntriesResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntriesMessage.FetchEntriesResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntryMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchEntryMessage.FetchEntryResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchKeysMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.FetchKeysMessage.FetchKeysResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.GetMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.GetMessage.GetResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityRequestMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityRequestMessage.IdentityResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityUpdateMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.IdentityUpdateMessage.IdentityUpdateResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.IndexCreationMsg;
+ import com.gemstone.gemfire.internal.cache.partitioned.InterestEventMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.InterestEventMessage.InterestEventResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage.InvalidateResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRSanityCheckMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage.UpdateEntryVersionResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage.PartitionResponse;
+ import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserver;
+ import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionObserverHolder;
+ import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult;
+ import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
+ import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor;
+ import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
+ import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.SizeMessage;
+ import com.gemstone.gemfire.internal.cache.partitioned.SizeMessage.SizeResponse;
+ import com.gemstone.gemfire.internal.cache.persistence.PRPersistentConfig;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.command.Get70;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+ import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
+ import com.gemstone.gemfire.internal.util.TransformUtils;
+ import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
+ import com.gemstone.gemfire.i18n.StringId;
+
+ /**
+ * A Region whose total storage is split into chunks of data (partitions) which
+ * are copied up to a configurable level (for high availability) and placed on
+ * multiple VMs for improved performance and increased storage capacity.
+ *
+ */
+ public class PartitionedRegion extends LocalRegion implements
+ CacheDistributionAdvisee, QueryExecutor {
+
+ public static final Random rand = new Random(Long.getLong(
+ "gemfire.PartitionedRegionRandomSeed", NanoTimer.getTime()).longValue());
+
+ private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger();
+
+ private final DiskRegionStats diskRegionStats;
+ /**
+ * Changes scope of replication to secondary bucket to SCOPE.DISTRIBUTED_NO_ACK
+ */
+ public static final boolean DISABLE_SECONDARY_BUCKET_ACK = Boolean.getBoolean(
+ "gemfire.disablePartitionedRegionBucketAck");
+
+ /**
+ * A debug flag used for testing calculation of starting bucket id
+ */
+ public static boolean BEFORE_CALCULATE_STARTING_BUCKET_FLAG = false;
+
+ /**
+ * Thread specific random number
+ */
+ private static ThreadLocal threadRandom = new ThreadLocal() {
+ @Override
+ protected Object initialValue() {
+ int i = rand.nextInt();
+ if (i < 0) {
+ i = -1 * i;
+ }
+ return Integer.valueOf(i);
+ }
+ };
+
+ /**
+ * Global Region for storing PR config ( PRName->PRConfig). This region would
+ * be used to resolve PR name conflict.*
+ */
+ private volatile Region<String, PartitionRegionConfig> prRoot;
+
+ /**
+ *
+ * PartitionedRegionDataStore class takes care of data storage for the PR.
+ * This will contain the bucket Regions to store data entries for PR*
+ */
+ protected PartitionedRegionDataStore dataStore;
+
+ /**
+ * The advisor that hold information about this partitioned region
+ */
+ private final RegionAdvisor distAdvisor;
+
+ /** Logging mechanism for debugging */
+ private static final Logger logger = LogService.getLogger();
+
+ /** cleanup flags * */
+ private boolean cleanPRRegistration = false;
+
+ /** Time to wait for for acquiring distributed lock ownership */
+ final static long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter
+ .parseLong(
+ System
+ .getProperty(PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_PROPERTY),
+ PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_DEFAULT);
+
+ /**
+ * default redundancy level is 0.
+ */
+ final int redundantCopies;
+
+ /**
+ * The miminum amount of redundancy needed for a write operation
+ */
+ final int minimumWriteRedundancy;
+
+ /**
+ * The miminum amount of redundancy needed for a read operation
+ */
+ final int minimumReadRedundancy;
+
+ /**
+ * Ratio of currently allocated memory to maxMemory that triggers rebalance
+ * activity.
+ */
+ final static float rebalanceThreshold = 0.75f;
+
+ /** The maximum memory allocated for this node in Mb */
+ final int localMaxMemory;
+
+ /** The maximum milliseconds for retrying operations */
+ final private int retryTimeout;
+
+ /**
+ * The statistics for this PR
+ */
+ public final PartitionedRegionStats prStats;
+
+ // private Random random = new Random(System.currentTimeMillis());
+
+ /** Number of initial buckets */
+ private final int totalNumberOfBuckets;
+
+ /**
+ * To check if local cache is enabled.
+ */
+ private static final boolean localCacheEnabled = false;
+
+ // private static final boolean throwIfNoNodesLeft = true;
+
+ public static final int DEFAULT_RETRY_ITERATIONS = 3;
+
+ /**
+ * Flag to indicate if a cache loader is present
+ */
+ private volatile boolean haveCacheLoader;
+
+ /**
+ * Region identifier used for DLocks (Bucket and Region)
+ */
+ private final String regionIdentifier;
+
+ /**
+ * Maps each PR to a prId. This prId will uniquely identify the PR.
+ */
+ static final PRIdMap prIdToPR = new PRIdMap();
+
+ /**
+ * Flag to indicate whether region is closed
+ *
+ */
+ public volatile boolean isClosed = false;
+
+ /**
+ * a flag indicating that the PR is destroyed in this VM
+ */
+ public volatile boolean isLocallyDestroyed = false;
+
+ /**
+ * the thread locally destroying this pr. not volatile,
+ * so always check isLocallyDestroyed before checking locallyDestroyingThread
+ *
+ * Concurrency: {@link #isLocallyDestroyed} is volatile
+ */
+ public Thread locallyDestroyingThread;
+
+ // TODO someone please add a javadoc for this
+ private volatile boolean hasPartitionedIndex = false;
+
+ /**
+ * regionMembershipListener notification requires this to be plugged into
+ * a PR's RegionAdvisor
+ */
+ private final AdvisorListener advisorListener = new AdvisorListener();
+
+ /*
+ * Map containing <IndexTask, FutureTask<IndexTask> or Index>.
+ * IndexTask represents an index thats completely created or
+ * one thats in create phase. This is done in order to avoid
+ * synchronization on the indexes.
+ */
+ private final ConcurrentMap indexes = new ConcurrentHashMap();
+
+ private volatile boolean recoveredFromDisk;
+
+ public static final int RUNNING_MODE = -1;
+ public static final int PRIMARY_BUCKETS_LOCKED = 1;
+ public static final int DISK_STORE_FLUSHED = 2;
+ public static final int OFFLINE_EQUAL_PERSISTED = 3;
+
+ private volatile int shutDownAllStatus = RUNNING_MODE;
+
+ private final long birthTime = System.currentTimeMillis();
+
+ public void setShutDownAllStatus(int newStatus) {
+ this.shutDownAllStatus = newStatus;
+ }
+
+ private final PartitionedRegion colocatedWithRegion;
+
+ private List<BucketRegion> sortedBuckets;
+
+ private ScheduledExecutorService bucketSorter;
+
+ private ConcurrentMap<String, Integer[]> partitionsMap = new ConcurrentHashMap<String, Integer[]>();
+
+ public ConcurrentMap<String, Integer[]> getPartitionsMap() {
+ return this.partitionsMap;
+ }
+ /**
+ * for wan shadowPR
+ */
+ private boolean enableConflation;
+
+ private final Object indexLock = new Object();
+
+ /**
+ * Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 =
+ * NWHOP tp servers in other server-grp
+ */
+ private final ThreadLocal<Byte> isNetworkHop = new ThreadLocal<Byte>() {
+ @Override
+ protected Byte initialValue() {
+ return Byte.valueOf((byte)0);
+ }
+ };
+
+ public void setIsNetworkHop(Byte value) {
+ this.isNetworkHop.set(value);
+ }
+
+ public Byte isNetworkHop() {
+ return this.isNetworkHop.get();
+ }
+
+ private final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() {
+ @Override
+ protected Byte initialValue() {
+ return 0;
+ }
+ };
+
+ public void setMetadataVersion(Byte value) {
+ this.metadataVersion.set(value);
+ }
+
+ public Byte getMetadataVersion() {
+ return this.metadataVersion.get();
+ }
+
+
+ /**
+ * Returns the LRUStatistics for this PR.
+ * This is needed to find the single instance of LRUStatistics
+ * created early for a PR when it is recovered from disk.
+ * This fixes bug 41938
+ */
+ public LRUStatistics getPRLRUStatsDuringInitialization() {
+ LRUStatistics result = null;
+ if (getDiskStore() != null) {
+ result = getDiskStore().getPRLRUStats(this);
+ }
+ return result;
+ }
+
+
+ ////////////////// ConcurrentMap methods //////////////////
+
+ @Override
+ public boolean remove(Object key, Object value, Object callbackArg) {
+ final long startTime = PartitionedRegionStats.startTime();
+ try {
+ return super.remove(key, value, callbackArg);
+ }
+ finally {
+ this.prStats.endDestroy(startTime);
+ }
+ }
+
+
+
+ ////////////////// End of ConcurrentMap methods //////////////////
+
+
+ public PartitionListener[] getPartitionListeners() {
+ return this.partitionListeners;
+ }
+
+ /**
+ * Return canonical representation for a bucket (for logging)
+ *
+ * @param bucketId
+ * the bucket
+ * @return a String representing this PR and the bucket
+ */
+ public String bucketStringForLogs(int bucketId) {
+ return getPRId() + BUCKET_ID_SEPARATOR + bucketId;
+ }
+
+ /** Separator between PRId and bucketId for creating bucketString */
+ public static final String BUCKET_ID_SEPARATOR = ":";
+
+ /**
+ * Clear the prIdMap, typically used when disconnecting from the distributed
+ * system or clearing the cache
+ */
+ public static void clearPRIdMap() {
+ synchronized (prIdToPR) {
+ prIdToPR.clear();
+ }
+ }
+
+ private static DisconnectListener dsPRIdCleanUpListener = new DisconnectListener() {
+ @Override
+ public String toString() {
+ return LocalizedStrings.PartitionedRegion_SHUTDOWN_LISTENER_FOR_PARTITIONEDREGION.toLocalizedString();
+ }
+
+ public void onDisconnect(InternalDistributedSystem sys) {
+ clearPRIdMap();
+ }
+ };
+
+
+ public static class PRIdMap extends HashMap {
+ private static final long serialVersionUID = 3667357372967498179L;
+ public final static String DESTROYED = "Partitioned Region Destroyed";
+
+ final static String LOCALLY_DESTROYED = "Partitioned Region Is Locally Destroyed";
+
+ final static String FAILED_REGISTRATION = "Partitioned Region's Registration Failed";
+
+ public final static String NO_PATH_FOUND = "NoPathFound";
+
+ private volatile boolean cleared = true;
+
+ @Override
+ public Object get(Object key) {
+ throw new UnsupportedOperationException(LocalizedStrings.PartitionedRegion_PRIDMAPGET_NOT_SUPPORTED_USE_GETREGION_INSTEAD.toLocalizedString());
+ }
+
+ public Object getRegion(Object key) throws PRLocallyDestroyedException {
+ if (cleared) {
+ Cache c = GemFireCacheImpl.getInstance();
+ if (c == null) {
+ throw new CacheClosedException();
+ }
+ else {
+ c.getCancelCriterion().checkCancelInProgress(null);
+ }
+ }
+ Assert.assertTrue(key instanceof Integer);
+
+ Object o = super.get(key);
+ if (o == DESTROYED) {
+ throw new RegionDestroyedException(LocalizedStrings.PartitionedRegion_REGION_FOR_PRID_0_IS_DESTROYED.toLocalizedString(key), NO_PATH_FOUND);
+ }
+ if (o == LOCALLY_DESTROYED) {
+ throw new PRLocallyDestroyedException(LocalizedStrings.PartitionedRegion_REGION_WITH_PRID_0_IS_LOCALLY_DESTROYED_ON_THIS_NODE.toLocalizedString(key));
+ }
+ if (o == FAILED_REGISTRATION) {
+ throw new PRLocallyDestroyedException(LocalizedStrings.PartitionedRegion_REGION_WITH_PRID_0_FAILED_INITIALIZATION_ON_THIS_NODE.toLocalizedString(key));
+ }
+ return o;
+ }
+
+ @Override
+ public Object remove(final Object key) {
+ return put(key, DESTROYED, true);
+ }
+
+ @Override
+ public Object put(final Object key, final Object value) {
+ return put(key, value, true);
+ }
+
+ public Object put(final Object key, final Object value,
+ boolean sendIdentityRequestMessage) {
+ if (cleared) {
+ cleared = false;
+ }
+
+ if (key == null) {
+ throw new NullPointerException(LocalizedStrings.PartitionedRegion_NULL_KEY_NOT_ALLOWED_FOR_PRIDTOPR_MAP.toLocalizedString());
+ }
+ if (value == null) {
+ throw new NullPointerException(LocalizedStrings.PartitionedRegion_NULL_VALUE_NOT_ALLOWED_FOR_PRIDTOPR_MAP.toLocalizedString());
+ }
+ Assert.assertTrue(key instanceof Integer);
+ if (sendIdentityRequestMessage)
+ IdentityRequestMessage.setLatestId(((Integer)key).intValue());
+ if ((super.get(key) == DESTROYED) && (value instanceof PartitionedRegion)) {
+ PartitionedRegionException pre = new PartitionedRegionException(LocalizedStrings.PartitionedRegion_CAN_NOT_REUSE_OLD_PARTITIONED_REGION_ID_0.toLocalizedString(key));
+ throw pre;
+ }
+ return super.put(key, value);
+ }
+
+ @Override
+ public void clear() {
+ this.cleared = true;
+ super.clear();
+ }
+
+ public synchronized String dump() {
+ StringBuffer b = new StringBuffer("prIdToPR Map@");
+ b.append(System.identityHashCode(prIdToPR)).append(":\n");
+ Map.Entry me;
+ for (Iterator i = prIdToPR.entrySet().iterator(); i.hasNext();) {
+ me = (Map.Entry)i.next();
+ b.append(me.getKey()).append("=>").append(me.getValue());
+ if (i.hasNext()) {
+ b.append("\n");
+ }
+ }
+ return b.toString();
+ }
+ }
+
+ private int partitionedRegionId = -3;
+
+ // final private Scope userScope;
+
+ /** Node description */
+ final private Node node;
+
+ /** Helper Object for redundancy Management of PartitionedRegion */
+ private final PRHARedundancyProvider redundancyProvider;
+
+ /**
+ * flag saying whether this VM needs cache operation notifications from other
+ * members
+ */
+ private boolean requiresNotification;
+
+ /**
+ * Latch that signals when the Bucket meta-data is ready to receive updates
+ */
+ private final StoppableCountDownLatch initializationLatchAfterBucketIntialization;
+
+ /**
+ * Constructor for a PartitionedRegion. This has an accessor (Region API)
+ * functionality and contains a datastore for actual storage. An accessor can
+ * act as a local cache by having a local storage enabled. A PartitionedRegion
+ * can be created by a factory method of RegionFactory.java and also by
+ * invoking Cache.createRegion(). (Cache.xml etc to be added)
+ *
+ */
+
+ static public final String RETRY_TIMEOUT_PROPERTY =
+ "gemfire.partitionedRegionRetryTimeout";
+
+ private final PartitionRegionConfigValidator validator ;
+
+ final List<FixedPartitionAttributesImpl> fixedPAttrs;
+
+ private byte fixedPASet;
+
+ public List<PartitionedRegion> colocatedByList= new CopyOnWriteArrayList<PartitionedRegion>();
+
+ private final PartitionListener[] partitionListeners;
+
+ private boolean isShadowPR = false;
+ private boolean isShadowPRForHDFS = false;
+
+ private AbstractGatewaySender parallelGatewaySender = null;
+
+ private final ThreadLocal<Boolean> queryHDFS = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ public PartitionedRegion(String regionname, RegionAttributes ra,
+ LocalRegion parentRegion, GemFireCacheImpl cache,
+ InternalRegionArguments internalRegionArgs) {
+ super(regionname, ra, parentRegion, cache, internalRegionArgs);
+
+ this.node = initializeNode();
+ this.prStats = new PartitionedRegionStats(cache.getDistributedSystem(), getFullPath());
+ this.regionIdentifier = getFullPath().replace('/', '#');
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Constructing Partitioned Region {}", regionname);
+ }
+
+ // By adding this disconnect listener we ensure that the pridmap is cleaned
+ // up upon
+ // distributed system disconnect even this (or other) PRs are destroyed
+ // (which prevents pridmap cleanup).
+ cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
+
+ // add an async queue for the region if the store name is not null.
+ if (this.getHDFSStoreName() != null) {
+ String eventQueueName = getHDFSEventQueueName();
+ super.addAsyncEventQueueId(eventQueueName);
+ }
+
+ // this.userScope = ra.getScope();
+ this.partitionAttributes = ra.getPartitionAttributes();
+ this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory();
+ this.retryTimeout = Integer.getInteger(RETRY_TIMEOUT_PROPERTY,
+ PartitionedRegionHelper.DEFAULT_TOTAL_WAIT_RETRY_ITERATION).intValue();
+ this.totalNumberOfBuckets = this.partitionAttributes.getTotalNumBuckets();
+ this.prStats.incTotalNumBuckets(this.totalNumberOfBuckets);
+ this.distAdvisor = RegionAdvisor.createRegionAdvisor(this); // Warning: potential early escape of instance
+ this.redundancyProvider = new PRHARedundancyProvider(this); // Warning:
+ // potential
+ // early escape
+ // instance
+
+ // localCacheEnabled = ra.getPartitionAttributes().isLocalCacheEnabled();
+ // This is to make sure that local-cache get and put works properly.
+ // getScope is overridden to return the correct scope.
+ // this.scope = Scope.LOCAL;
+ this.redundantCopies = ra.getPartitionAttributes().getRedundantCopies();
+ this.prStats.setConfiguredRedundantCopies(ra.getPartitionAttributes().getRedundantCopies());
+ this.prStats.setLocalMaxMemory(ra.getPartitionAttributes().getLocalMaxMemory() * 1024L * 1024);
+
+ // No redundancy required for writes
+ this.minimumWriteRedundancy = Integer.getInteger(
+ "gemfire.mimimumPartitionedRegionWriteRedundancy", 0).intValue();
+ // No redundancy required for reads
+ this.minimumReadRedundancy = Integer.getInteger(
+ "gemfire.mimimumPartitionedRegionReadRedundancy", 0).intValue();
+
+ this.haveCacheLoader = ra.getCacheLoader() != null;
+
+ this.initializationLatchAfterBucketIntialization = new StoppableCountDownLatch(
+ this.getCancelCriterion(), 1);
+
+ this.validator = new PartitionRegionConfigValidator(this);
+ this.partitionListeners = this.partitionAttributes.getPartitionListeners();
+
+ this.colocatedWithRegion = ColocationHelper.getColocatedRegion(this);
+ if (colocatedWithRegion != null) {
+ //In colocation chain, child region inherita the fixed partitin attributes from parent region.
+ this.fixedPAttrs = colocatedWithRegion.getFixedPartitionAttributesImpl();
+ this.fixedPASet = colocatedWithRegion.fixedPASet;
+ synchronized (colocatedWithRegion.colocatedByList) {
+ colocatedWithRegion.colocatedByList.add(this);
+ }
+ }
+ else {
+ this.fixedPAttrs = this.partitionAttributes.getFixedPartitionAttributes();
+ this.fixedPASet = 0;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partitioned Region {} constructed {}", regionname, (this.haveCacheLoader ? "with a cache loader" : ""));
+ }
+ if (this.getEvictionAttributes() != null
+ && this.getEvictionAttributes().getAlgorithm().isLRUHeap()) {
+ this.sortedBuckets = new ArrayList<BucketRegion>();
+ final ThreadGroup grp = LoggingThreadGroup.createThreadGroup("BucketSorterThread", logger);
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(grp, r, "BucketSorterThread");
+ t.setDaemon(true);
+ return t;
+ }
+ };
+ this.bucketSorter = Executors.newScheduledThreadPool(1, tf);
+ }
+ // If eviction is on, Create an instance of PartitionedRegionLRUStatistics
+ if ((this.getEvictionAttributes() != null
+ && !this.getEvictionAttributes().getAlgorithm().isNone()
+ && this.getEvictionAttributes().getAction().isOverflowToDisk())
+ || this.getDataPolicy().withPersistence()) {
+ StatisticsFactory sf = this.getCache().getDistributedSystem();
+ this.diskRegionStats = new DiskRegionStats(sf, getFullPath());
+ } else {
+ this.diskRegionStats = null;
+ }
+ if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) {
+ this.isShadowPR = true;
+ this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender();
+ if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue())
+ this.isShadowPRForHDFS = true;
+ }
+
+
+ /*
+ * Start persistent profile logging if we are a persistent region.
+ */
+ if(dataPolicy.withPersistence()) {
+ startPersistenceProfileLogging();
+ }
+ }
+
+ /**
+ * Monitors when other members that participate in this persistent region are removed and creates
+ * a log entry marking the event.
+ */
+ private void startPersistenceProfileLogging() {
+ this.distAdvisor.addProfileChangeListener(new ProfileListener() {
+ @Override
+ public void profileCreated(Profile profile) {
+ }
+
+ @Override
+ public void profileUpdated(Profile profile) {
+ }
+
+ @Override
+ public void profileRemoved(Profile profile, boolean destroyed) {
+ /*
+ * Don't bother logging membership activity if our region isn't ready.
+ */
+ if(isInitialized()) {
+ CacheProfile cacheProfile = ((profile instanceof CacheProfile) ? (CacheProfile) profile : null);
+ Set<String> onlineMembers = new HashSet<String>();
+
+ TransformUtils.transform(PartitionedRegion.this.distAdvisor.advisePersistentMembers().values(),onlineMembers,TransformUtils.persistentMemberIdToLogEntryTransformer);
+
+ logger.info(LocalizedMessage.create(LocalizedStrings.PersistenceAdvisorImpl_PERSISTENT_VIEW,
+ new Object[] {PartitionedRegion.this.getName(),TransformUtils.persistentMemberIdToLogEntryTransformer.transform(cacheProfile.persistentID),onlineMembers}));
+ }
+ }
+ });
+ }
+
+ @Override
+ public final boolean isHDFSRegion() {
+ return this.getHDFSStoreName() != null;
+ }
+
+ @Override
+ public final boolean isHDFSReadWriteRegion() {
+ return isHDFSRegion() && !getHDFSWriteOnly();
+ }
+
+ @Override
+ protected final boolean isHDFSWriteOnly() {
+ return isHDFSRegion() && getHDFSWriteOnly();
+ }
+
+ public final void setQueryHDFS(boolean includeHDFS) {
+ queryHDFS.set(includeHDFS);
+ }
+
+ @Override
+ public final boolean includeHDFSResults() {
+ return queryHDFS.get();
+ }
+
+ public final boolean isShadowPR() {
+ return isShadowPR;
+ }
+
+ public final boolean isShadowPRForHDFS() {
+ return isShadowPRForHDFS;
+ }
+
+ public AbstractGatewaySender getParallelGatewaySender() {
+ return parallelGatewaySender;
+ }
+
+ public Set<String> getParallelGatewaySenderIds() {
+ Set<String> regionGatewaySenderIds = this.getAllGatewaySenderIds();
+ if (regionGatewaySenderIds.isEmpty()) {
+ return Collections.EMPTY_SET;
+ }
+ Set<GatewaySender> cacheGatewaySenders = getCache().getAllGatewaySenders();
+ Set<String> parallelGatewaySenderIds = new HashSet<String>();
+ for (GatewaySender sender : cacheGatewaySenders) {
+ if (regionGatewaySenderIds.contains(sender.getId())
+ && sender.isParallel()) {
+ parallelGatewaySenderIds.add(sender.getId());
+ }
+ }
+ return parallelGatewaySenderIds;
+ }
+
+ List<PartitionedRegion> getColocatedByList() {
+ return this.colocatedByList;
+ }
+
+ public boolean isColocatedBy() {
+ return !this.colocatedByList.isEmpty();
+ }
+
+ private void createAndValidatePersistentConfig() {
+ DiskStoreImpl dsi = this.getDiskStore();
+ if (this.dataPolicy.withPersistence() && !this.concurrencyChecksEnabled
+ && supportsConcurrencyChecks()) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_ENABLING_CONCURRENCY_CHECKS_FOR_PERSISTENT_PR, this.getFullPath()));
+ this.concurrencyChecksEnabled = true;
+ }
+ if (dsi != null && this.getDataPolicy().withPersistence()) {
+ String colocatedWith = colocatedWithRegion == null
+ ? "" : colocatedWithRegion.getFullPath();
+ PRPersistentConfig config = dsi.getPersistentPRConfig(this.getFullPath());
+ if(config != null) {
+ if (config.getTotalNumBuckets() != this.getTotalNumberOfBuckets()) {
+ Object[] prms = new Object[] { this.getFullPath(), this.getTotalNumberOfBuckets(),
+ config.getTotalNumBuckets() };
+ IllegalStateException ise = new IllegalStateException(
+ LocalizedStrings.PartitionedRegion_FOR_REGION_0_TotalBucketNum_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms));
+ throw ise;
+ }
+ //Make sure we don't change to be colocated with a different region
+ //We also can't change from colocated to not colocated without writing
+ //a record to disk, so we won't allow that right now either.
+ if (!colocatedWith.equals(config.getColocatedWith())) {
+ Object[] prms = new Object[] { this.getFullPath(), colocatedWith,
+ config.getColocatedWith() };
+ DiskAccessException dae = new DiskAccessException(LocalizedStrings.LocalRegion_A_DISKACCESSEXCEPTION_HAS_OCCURED_WHILE_WRITING_TO_THE_DISK_FOR_REGION_0_THE_REGION_WILL_BE_CLOSED.toLocalizedString(this.getFullPath()), null, dsi);
+ dsi.handleDiskAccessException(dae);
+ IllegalStateException ise = new IllegalStateException(
+ LocalizedStrings.PartitionedRegion_FOR_REGION_0_ColocatedWith_1_SHOULD_NOT_BE_CHANGED_Previous_Configured_2.toString(prms));
+ throw ise;
+ }
+ } else {
+
+ config= new PRPersistentConfig(this.getTotalNumberOfBuckets(),
+ colocatedWith);
+ dsi.addPersistentPR(this.getFullPath(), config);
+ //Fix for support issue 7870 - the parent region needs to be able
+ //to discover that there is a persistent colocated child region. So
+ //if this is a child region, persist its config to the parent disk store
+ //as well.
+ if(colocatedWithRegion != null
+ && colocatedWithRegion.getDiskStore() != null
+ && colocatedWithRegion.getDiskStore() != dsi) {
+ colocatedWithRegion.getDiskStore().addPersistentPR(this.getFullPath(), config);
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Initializes the PartitionedRegion meta data, adding this Node and starting
+ * the service on this node (if not already started).
+ * Made this synchronized for bug 41982
+ * @return true if initialize was done; false if not because it was destroyed
+ */
+ private synchronized boolean initPRInternals(InternalRegionArguments internalRegionArgs) {
+
+ if (this.isLocallyDestroyed) {
+ // don't initialize if we are already destroyed for bug 41982
+ return false;
+ }
+ /* Initialize the PartitionRegion */
+ if (cache.isCacheAtShutdownAll()) {
+ throw new CacheClosedException("Cache is shutting down");
+ }
+ validator.validateColocation();
+
+ //Do this after the validation, to avoid creating a persistent config
+ //for an invalid PR.
+ createAndValidatePersistentConfig();
+ initializePartitionedRegion();
+
+ /* set the total number of buckets */
+ // setTotalNumOfBuckets();
+ // If localMaxMemory is set to 0, do not initialize Data Store.
+ final boolean storesData = this.localMaxMemory > 0;
+ if (storesData) {
+ initializeDataStore(this.getAttributes());
+ }
+
+ // register this PartitionedRegion, Create a PartitionRegionConfig and bind
+ // it into the allPartitionedRegion system wide Region.
+ // IMPORTANT: do this before advising peers that we have this region
+ registerPartitionedRegion(storesData);
+
+ getRegionAdvisor().initializeRegionAdvisor(); // must be BEFORE initializeRegion call
+ getRegionAdvisor().addMembershipListener(this.advisorListener); // fix for bug 38719
+
+ // 3rd part of eviction attributes validation, after eviction attributes
+ // have potentially been published (by the first VM) but before buckets are created
+ validator.validateEvictionAttributesAgainstLocalMaxMemory();
+ validator.validateFixedPartitionAttributes();
+
+ // Register with the other Nodes that have this region defined, this
+ // allows for an Advisor profile exchange, also notifies the Admin
+ // callbacks that this Region is created.
+ try {
+ new CreateRegionProcessor(this).initializeRegion();
+ } catch (IllegalStateException e) {
+ // If this is a PARTITION_PROXY then retry region creation
+ // after toggling the concurrencyChecksEnabled flag. This is
+ // required because for persistent regions, we enforce concurrencyChecks
+ if (!this.isDataStore() && supportsConcurrencyChecks()) {
+ this.concurrencyChecksEnabled = !this.concurrencyChecksEnabled;
+ new CreateRegionProcessor(this).initializeRegion();
+ } else {
+ throw e;
+ }
+ }
+
+ if (!this.isDestroyed && !this.isLocallyDestroyed) {
+ // Register at this point so that other members are known
+ this.cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
+ }
+
+ // Create OQL indexes before starting GII.
+ createOQLIndexes(internalRegionArgs);
+
+ // if any other services are dependent on notifications from this region,
+ // then we need to make sure that in-process ops are distributed before
+ // releasing the GII latches
+ if (this.isAllEvents()) {
+ StateFlushOperation sfo = new StateFlushOperation(getDistributionManager());
+ try {
+ sfo.flush(this.distAdvisor.adviseAllPRNodes(),
+ getDistributionManager().getId(),
+ DistributionManager.HIGH_PRIORITY_EXECUTOR, false);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ getCancelCriterion().checkCancelInProgress(ie);
+ }
+ }
+
+ releaseBeforeGetInitialImageLatch(); // moved to this spot for bug 36671
+
+ // requires prid assignment mthomas 4/3/2007
+ getRegionAdvisor().processProfilesQueuedDuringInitialization();
+
+ releaseAfterBucketMetadataSetupLatch();
+
+ try {
+ if(storesData) {
+ if(this.redundancyProvider.recoverPersistentBuckets()) {
+ //Mark members as recovered from disk recursively, starting
+ //with the leader region.
+ PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(this);
+ markRecoveredRecursively(leaderRegion);
+ }
+ }
+ }
+ catch (RegionDestroyedException rde) {
+ // Do nothing.
+ if (logger.isDebugEnabled()) {
+ logger.debug("initPRInternals: failed due to exception", rde);
+ }
+ }
+
+ releaseAfterGetInitialImageLatch();
+
+ try {
+ if(storesData) {
+ this.redundancyProvider.scheduleCreateMissingBuckets();
+
+ if (this.redundantCopies > 0) {
+ this.redundancyProvider.startRedundancyRecovery();
+ }
+ }
+ }
+ catch (RegionDestroyedException rde) {
+ // Do nothing.
+ if (logger.isDebugEnabled()) {
+ logger.debug("initPRInternals: failed due to exception", rde);
+ }
+ }
+
+ return true;
+ }
+
+ private void markRecoveredRecursively(PartitionedRegion region) {
+ region.setRecoveredFromDisk();
+ for(PartitionedRegion colocatedRegion : ColocationHelper.getColocatedChildRegions(region)) {
+ markRecoveredRecursively(colocatedRegion);
+ }
+ }
+
+ @Override
+ protected void postCreateRegion() {
+ super.postCreateRegion();
+ CacheListener[] listeners = fetchCacheListenersField();
+ if (listeners != null && listeners.length > 0) {
+ Set others = getRegionAdvisor().adviseGeneric();
+ for (int i = 0; i < listeners.length; i++) {
+ if (listeners[i] instanceof RegionMembershipListener) {
+ RegionMembershipListener rml = (RegionMembershipListener)listeners[i];
+ try {
+ DistributedMember[] otherDms = new DistributedMember[others
+ .size()];
+ others.toArray(otherDms);
+ rml.initialMembers(this, otherDms);
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.error(LocalizedMessage.create(LocalizedStrings.DistributedRegion_EXCEPTION_OCCURRED_IN_REGIONMEMBERSHIPLISTENER), t);
+ }
+ }
+ }
+ }
+
+ PartitionListener[] partitionListeners = this.getPartitionListeners();
+ if (partitionListeners != null && partitionListeners.length != 0) {
+ for (int i = 0; i < partitionListeners.length; i++) {
+ PartitionListener listener = partitionListeners[i];
+ if (listener != null) {
+ listener.afterRegionCreate(this);
+ }
+ }
+ }
+
+ Set<String> allGatewaySenderIds = getAllGatewaySenderIds();
+ if (!allGatewaySenderIds.isEmpty()) {
+ for (GatewaySender sender : cache.getAllGatewaySenders()) {
+ if (sender.isParallel()
+ && allGatewaySenderIds.contains(sender.getId())) {
+ /**
+ * get the ParallelGatewaySender to create the colocated partitioned
+ * region for this region.
+ */
+ if (sender.isRunning() ) {
+ AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender;
+ ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(new RegionQueue[1])[0])
+ .addShadowPartitionedRegionForUserPR(this);
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Initializes the PartitionedRegion. OVERRIDES
+ */
+ @Override
+ protected void initialize(InputStream snapshotInputStream,
+ InternalDistributedMember imageTarget,
+ InternalRegionArguments internalRegionArgs) throws TimeoutException,
+ ClassNotFoundException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PartitionedRegion#initialize {}", getName());
+ }
+ RegionLogger.logCreate(getName(), getDistributionManager().getDistributionManagerId());
+
+ this.requiresNotification = this.cache.requiresNotificationFromPR(this);
+ initPRInternals(internalRegionArgs);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("PartitionRegion#initialize: finished with {}", this);
+ }
+ this.cache.addPartitionedRegion(this);
+
+ }
+
+ /**
+ * Initializes the Node for this Map.
+ */
+ private Node initializeNode() {
+ return new Node(getDistributionManager().getId(),
+ SERIAL_NUMBER_GENERATOR.getAndIncrement());
+ }
+
+ /**
+ * receive notification that a bridge server or wan gateway has been created
+ * that requires notification of cache events from this region
+ */
+ public void cacheRequiresNotification() {
+ if (!this.requiresNotification
+ && !(this.isClosed || this.isLocallyDestroyed)) {
+ // tell others of the change in status
+ this.requiresNotification = true;
+ new UpdateAttributesProcessor(this).distribute(false);
+ }
+ }
+
+ @Override
+ void distributeUpdatedProfileOnHubCreation()
+ {
+ if (!(this.isClosed || this.isLocallyDestroyed)) {
+ // tell others of the change in status
+ this.requiresNotification = true;
+ new UpdateAttributesProcessor(this).distribute(false);
+ }
+ }
+
+ @Override
+ void distributeUpdatedProfileOnSenderCreation()
+ {
+ if (!(this.isClosed || this.isLocallyDestroyed)) {
+ // tell others of the change in status
+ this.requiresNotification = true;
+ new UpdateAttributesProcessor(this).distribute(false);
+ }
+ }
+
+ public void addGatewaySenderId(String gatewaySenderId) {
+ super.addGatewaySenderId(gatewaySenderId);
+ new UpdateAttributesProcessor(this).distribute();
+ ((PartitionedRegion)this).distributeUpdatedProfileOnSenderCreation();
+ GatewaySender sender = getCache().getGatewaySender(gatewaySenderId);
+ if (sender!= null && sender.isParallel() && sender.isRunning()) {
+ AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender;
+ ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(
+ new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this);
+ }
+ }
+
+ public void removeGatewaySenderId(String gatewaySenderId){
+ super.removeGatewaySenderId(gatewaySenderId);
+ new UpdateAttributesProcessor(this).distribute();
+ }
+
+ public void addAsyncEventQueueId(String asyncEventQueueId) {
+ super.addAsyncEventQueueId(asyncEventQueueId);
+ new UpdateAttributesProcessor(this).distribute();
+ ((PartitionedRegion)this).distributeUpdatedProfileOnSenderCreation();
+ GatewaySender sender = getCache().getGatewaySender(AsyncEventQueueImpl.getSenderIdFromAsyncEventQueueId(asyncEventQueueId));
+ if (sender!= null && sender.isParallel() && sender.isRunning()) {
+ AbstractGatewaySender senderImpl = (AbstractGatewaySender)sender;
+ ((ConcurrentParallelGatewaySenderQueue)senderImpl.getQueues().toArray(
+ new RegionQueue[1])[0]).addShadowPartitionedRegionForUserPR(this);
+ }
+ }
+
+ public void removeAsyncEventQueueId(String asyncEventQueueId) {
+ super.removeAsyncEventQueueId(asyncEventQueueId);
+ new UpdateAttributesProcessor(this).distribute();
+ }
+
+ public void checkSameSenderIdsAvailableOnAllNodes() {
+ List senderIds = this.getCacheDistributionAdvisor()
+ .adviseSameGatewaySenderIds(getGatewaySenderIds());
+ if (!senderIds.isEmpty()) {
+ throw new GatewaySenderConfigurationException(
+ LocalizedStrings.Region_REGION_0_HAS_1_GATEWAY_SENDER_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_GATEWAY_SENDER_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_GATEWAY_SENDER_IDS_SHOULD_BE_SAME
+ .toLocalizedString(new Object[] { this.getName(),
+ senderIds.get(0), senderIds.get(1) }));
+ }
+
+ List asycnQueueIds = this.getCacheDistributionAdvisor()
+ .adviseSameAsyncEventQueueIds(getAsyncEventQueueIds());
+ if (!asycnQueueIds.isEmpty()) {
+ throw new GatewaySenderConfigurationException(
+ LocalizedStrings.Region_REGION_0_HAS_1_ASYNC_EVENT_QUEUE_IDS_ANOTHER_CACHE_HAS_THE_SAME_REGION_WITH_2_ASYNC_EVENT_QUEUE_IDS_FOR_REGION_ACROSS_ALL_MEMBERS_IN_DS_ASYNC_EVENT_QUEUE_IDS_SHOULD_BE_SAME
+ .toLocalizedString(new Object[] { this.getName(),
+ asycnQueueIds.get(0), asycnQueueIds.get(1) }));
+ }
+ }
+
+ /**
+ * Initializes the PartitionedRegion - create the Global regions for storing
+ * the PartitiotnedRegion configs.
+ */
+ private void initializePartitionedRegion() {
+ this.prRoot = PartitionedRegionHelper.getPRRoot(getCache());
+ }
+
+ @Override
+ public void remoteRegionInitialized(CacheProfile profile) {
+ if (isInitialized() && hasListener()) {
+ Object callback = DistributedRegion.TEST_HOOK_ADD_PROFILE? profile : null;
+ RegionEventImpl event = new RegionEventImpl(PartitionedRegion.this,
+ Operation.REGION_CREATE, callback, true, profile.peerMemberId);
+ dispatchListenerEvent(EnumListenerEvent.AFTER_REMOTE_REGION_CREATE,
+ event);
+ }
+ }
+
+ /**
+ * This method initializes the partitionedRegionDataStore for this PR.
+ *
+ * @param ra
+ * Region attributes
+ */
+ private void initializeDataStore(RegionAttributes ra) {
+
+ this.dataStore = PartitionedRegionDataStore.createDataStore(cache, this, ra
+ .getPartitionAttributes());
+ }
+
+ protected DistributedLockService getPartitionedRegionLockService() {
+ return getGemFireCache().getPartitionedRegionLockService();
+ }
+
+ /**
+ * Register this PartitionedRegion by: 1) Create a PartitionRegionConfig and
+ * 2) Bind it into the allPartitionedRegion system wide Region.
+ *
+ * @param storesData
+ * which indicates whether the instance in this cache stores
+ * data, effecting the Nodes PRType
+ *
+ * @see Node#setPRType(int)
+ */
+ private void registerPartitionedRegion(boolean storesData) {
+ // Register this ParitionedRegion. First check if the ParitionedRegion
+ // entry already exists globally.
+ PartitionRegionConfig prConfig = null;
+ PartitionAttributes prAttribs = getAttributes().getPartitionAttributes();
+ if (storesData) {
+ if (this.fixedPAttrs != null) {
+ this.node.setPRType(Node.FIXED_PR_DATASTORE);
+ } else {
+ this.node.setPRType(Node.ACCESSOR_DATASTORE);
+ }
+ this.node.setPersistence(getAttributes().getDataPolicy() == DataPolicy.PERSISTENT_PARTITION);
+ byte loaderByte = (byte)(getAttributes().getCacheLoader() != null ? 0x01 : 0x00);
+ byte writerByte = (byte)(getAttributes().getCacheWriter() != null ? 0x02 : 0x00);
+ this.node.setLoaderWriterByte((byte)(loaderByte + writerByte));
+ }
+ else {
+ if (this.fixedPAttrs != null) {
+ this.node.setPRType(Node.FIXED_PR_ACCESSOR);
+ } else {
+ this.node.setPRType(Node.ACCESSOR);
+ }
+ }
+ final RegionLock rl = getRegionLock();
+ try {
+ // if (!rl.lock()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("registerPartitionedRegion: obtaining lock");
+ }
+ rl.lock();
+ checkReadiness();
+
+ prConfig = this.prRoot.get(getRegionIdentifier());
+
+ if (prConfig == null) {
+ validateParalleGatewaySenderIds();
+ this.partitionedRegionId = generatePRId(getSystem());
+ prConfig = new PartitionRegionConfig(this.partitionedRegionId,
+ this.getFullPath(), prAttribs, this.getScope(),
+ getAttributes().getEvictionAttributes(),
+ getAttributes().getRegionIdleTimeout(),
+ getAttributes().getRegionTimeToLive(),
+ getAttributes().getEntryIdleTimeout(),
+ getAttributes().getEntryTimeToLive(),
+ this.getAllGatewaySenderIds());
+ logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_BORN_WITH_PRID_1_IDENT_2,
+ new Object[] { getFullPath(), Integer.valueOf(this.partitionedRegionId), getRegionIdentifier()}));
+
+ PRSanityCheckMessage.schedule(this);
+ }
+ else {
+ validator.validatePartitionAttrsFromPRConfig(prConfig);
+ if (storesData) {
+ validator.validatePersistentMatchBetweenDataStores(prConfig);
+ validator.validateCacheLoaderWriterBetweenDataStores(prConfig);
+ validator.validateFixedPABetweenDataStores(prConfig);
+ }
+
+ this.partitionedRegionId = prConfig.getPRId();
+ logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_PARTITIONED_REGION_0_IS_CREATED_WITH_PRID_1,
+ new Object[] { getFullPath(), Integer.valueOf(this.partitionedRegionId)}));
+ }
+
+ synchronized (prIdToPR) {
+ prIdToPR.put(Integer.valueOf(this.partitionedRegionId), this); // last
+ }
+ prConfig.addNode(this.node);
+ if (this.getFixedPartitionAttributesImpl() != null) {
+ calculateStartingBucketIDs(prConfig);
+ }
+ updatePRConfig(prConfig, false);
+ /*
+ * try { if (this.redundantCopies > 0) { if (storesData) {
+ * this.dataStore.grabBackupBuckets(false); } } } catch
+ * (RegionDestroyedException rde) { if (!this.isClosed) throw rde; }
+ */
+ this.cleanPRRegistration = true;
+ }
+ catch (LockServiceDestroyedException lsde) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("registerPartitionedRegion: unable to obtain lock for {}", this);
+ }
+ cleanupFailedInitialization();
+ throw new PartitionedRegionException(
+ LocalizedStrings.PartitionedRegion_CAN_NOT_CREATE_PARTITIONEDREGION_FAILED_TO_ACQUIRE_REGIONLOCK
+ .toLocalizedString(), lsde);
+ }
+ catch (IllegalStateException ill) {
+ cleanupFailedInitialization();
+ throw ill;
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ String registerErrMsg =
+ LocalizedStrings.PartitionedRegion_AN_EXCEPTION_WAS_CAUGHT_WHILE_REGISTERING_PARTITIONEDREGION_0_DUMPPRID_1
+ .toLocalizedString(new Object[] {getFullPath(), prIdToPR.dump()});
+ try {
+ synchronized (prIdToPR) {
+ if (prIdToPR.containsKey(Integer.valueOf(this.partitionedRegionId))) {
+ prIdToPR.put(Integer.valueOf(this.partitionedRegionId),
+ PRIdMap.FAILED_REGISTRATION, false);
+ logger.info(LocalizedMessage.create(LocalizedStrings.PartitionedRegion_FAILED_REGISTRATION_PRID_0_NAMED_1,
+ new Object[] {Integer.valueOf(this.partitionedRegionId), this.getName()}));
+ }
+ }
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable ignore) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partitioned Region creation, could not clean up after caught exception", ignore);
+ }
+ }
+ throw new PartitionedRegionException(registerErrMsg, t);
+ }
+ finally {
+ try {
+ rl.unlock();
+ if (logger.isDebugEnabled()) {
+ logger.debug("registerPartitionedRegion: released lock");
+ }
+ }
+ catch (Exception es) {
+ if (logger.isDebugEnabled()) {
+ logger.warn(es.getMessage(), es);
+ }
+ }
+ }
+ }
+
+ public void validateParalleGatewaySenderIds() throws PRLocallyDestroyedException{
+ for (String senderId : this.getParallelGatewaySenderIds()) {
+ for (PartitionRegionConfig config : this.prRoot.values()) {
+ if (config.getGatewaySenderIds().contains(senderId)) {
+ Map<String, PartitionedRegion> colocationMap = ColocationHelper
+ .getAllColocationRegions(this);
+ if (!colocationMap.isEmpty()) {
+ if (colocationMap.containsKey(config.getFullPath())) {
+ continue;
+ }
+ else {
+ int prID = config.getPRId();
+ PartitionedRegion colocatedPR = PartitionedRegion
+ .getPRFromId(prID);
+ PartitionedRegion leader = ColocationHelper
+ .getLeaderRegion(colocatedPR);
+ if (colocationMap.containsValue(leader)) {
+ continue;
+ }
+ else {
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionRegion_NON_COLOCATED_REGIONS_1_2_CANNOT_HAVE_SAME_PARALLEL_GATEWAY_SENDER_ID_2.toString(new Object[] {
+ this.getFullPath(),
+ config.getFullPath(),
+ senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX) ? "async event queue": "gateway sender",
+ senderId }));
+ }
+ }
+ }
+ else {
+ throw new IllegalStateException(
+ LocalizedStrings.PartitionRegion_NON_COLOCATED_REGIONS_1_2_CANNOT_HAVE_SAME_PARALLEL_GATEWAY_SENDER_ID_2.toString(new Object[] {
+ this.getFullPath(),
+ config.getFullPath(),
+ senderId.contains(AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX) ? "async event queue": "gateway sender",
+ senderId }));
+ }
+
+ }
+ }
+ }
+ }
+
+ /**
+ * @return whether this region requires event notification for all cache
+ * content changes from other nodes
+ */
+ public boolean getRequiresNotification() {
+ return this.requiresNotification;
+ }
+
+ /**
+ * Get the Partitioned Region identifier used for DLocks (Bucket and Region)
+ */
+ final public String getRegionIdentifier() {
+ return this.regionIdentifier;
+ }
+
+ void setRecoveredFromDisk() {
+ this.recoveredFromDisk = true;
+ new UpdateAttributesProcessor(this).distribute(false);
+ }
+
+ public final void updatePRConfig(PartitionRegionConfig prConfig,
+ boolean putOnlyIfUpdated) {
+ final Set<Node> nodes = prConfig.getNodes();
+ final PartitionedRegion colocatedRegion = ColocationHelper
+ .getColocatedRegion(this);
+ RegionLock colocatedLock = null;
+ boolean colocatedLockAcquired = false;
+ try {
+ boolean colocationComplete = false;
+ if (colocatedRegion != null && !prConfig.isColocationComplete() &&
+ // if the current node is marked uninitialized (SQLF DDL replay in
+ // progress) then colocation will definitely not be marked complete so
+ // avoid taking the expensive region lock
+ !getCache().isUnInitializedMember(getDistributionManager().getId())) {
+ colocatedLock = colocatedRegion.getRegionLock();
+ colocatedLock.lock();
+ colocatedLockAcquired = true;
+ final PartitionRegionConfig parentConf = this.prRoot
+ .get(colocatedRegion.getRegionIdentifier());
+ if (parentConf.isColocationComplete()
+ && parentConf.hasSameDataStoreMembers(prConfig)) {
+ colocationComplete = true;
+ // check if all the nodes have been initialized (SQLF bug #42089)
+ for (Node node : nodes) {
+ if (getCache().isUnInitializedMember(node.getMemberId())) {
+ colocationComplete = false;
+ break;
+ }
+ }
+ if (colocationComplete) {
+ prConfig.setColocationComplete();
+ }
+ }
+ }
+
+ if(isDataStore() && !prConfig.isFirstDataStoreCreated()) {
+ prConfig.setDatastoreCreated(getEvictionAttributes());
+ }
+ // N.B.: this put could fail with a CacheClosedException:
+ if (!putOnlyIfUpdated || colocationComplete) {
+ this.prRoot.put(getRegionIdentifier(), prConfig);
+ }
+ } finally {
+ if (colocatedLockAcquired) {
+ colocatedLock.unlock();
+ }
+ }
+ }
+
+ /**
+ *
+ * @param keyInfo
+ * @param access
+ * true if caller wants last accessed time updated
+ * @param allowTombstones - whether a tombstone can be returned
+ * @return TODO
+ */
+ @Override
+ protected Region.Entry<?, ?> nonTXGetEntry(KeyInfo keyInfo, boolean access, boolean allowTombstones) {
+ final long startTime = PartitionedRegionStats.startTime();
+ final Object key = keyInfo.getKey();
+ try {
+ int bucketId = keyInfo.getBucketId();
+ if (bucketId == KeyInfo.UNKNOWN_BUCKET) {
+ bucketId = PartitionedRegionHelper.getHashKey(this,
+ Operation.GET_ENTRY, key, null, null);
+ keyInfo.setBucketId(bucketId);
+ }
+ InternalDistributedMember targetNode = getOrCreateNodeForBucketRead(bucketId);
+ return getEntryInBucket(targetNode, bucketId, key, access, allowTombstones);
+ }
+ finally {
+ this.prStats.endGetEntry(startTime);
+ }
+ }
+
+ protected EntrySnapshot getEntryInBucket(
+ final DistributedMember targetNode, final int bucketId,
+ final Object key, boolean access, final boolean allowTombstones) {
+ final int retryAttempts = calcRetry();
+ if (logger.isTraceEnabled()) {
+ logger.trace("getEntryInBucket: " + "Key key={} ({}) from: {} bucketId={}",
+ key, key.hashCode(), targetNode, bucketStringForLogs(bucketId));
+ }
+ Integer bucketIdInt = Integer.valueOf(bucketId);
+ EntrySnapshot ret = null;
+ int count = 0;
+ RetryTimeKeeper retryTime = null;
+ InternalDistributedMember retryNode = (InternalDistributedMember)targetNode;
+ while (count <= retryAttempts) {
+ // Every continuation should check for DM cancellation
+ if (retryNode == null) {
+ checkReadiness();
+ if (retryTime == null) {
+ retryTime = new RetryTimeKeeper(this.retryTimeout);
+ }
+ if (retryTime.overMaximum()) {
+ break;
+ }
+ retryNode = getOrCreateNodeForBucketRead(bucketId);
+
+ // No storage found for bucket, early out preventing hot loop, bug 36819
+ if (retryNode == null) {
+ checkShutdown();
+ return null;
+ }
+ continue;
+ }
+ try {
+ final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId());
+ if (loc) {
+ ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true);
+ } else {
+ ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones);
+ // TODO:Suranjan&Yogesh : there should be better way than this one
+ String name = Thread.currentThread().getName();
+ if (name.startsWith("ServerConnection")
+ && !getMyId().equals(targetNode)) {
+ setNetworkHop(bucketIdInt, (InternalDistributedMember)targetNode);
+ }
+ }
+
+ return ret;
+ }
+ catch (PRLocallyDestroyedException pde) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("getEntryInBucket: Encountered PRLocallyDestroyedException ");
+ }
+ checkReadiness();
+ }
+ catch (EntryNotFoundException enfe) {
+ return null;
+ }
+ catch (ForceReattemptException prce) {
+ prce.checkKey(key);
+ if (logger.isDebugEnabled()) {
+ logger.debug("getEntryInBucket: retrying, attempts so far: {}", count, prce);
+ }
+ checkReadiness();
+ InternalDistributedMember lastNode = retryNode;
+ retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue());
+ if (lastNode.equals(retryNode)) {
+ if (retryTime == null) {
+ retryTime = new RetryTimeKeeper(this.retryTimeout);
+ }
+ if (retryTime.overMaximum()) {
+ break;
+ }
+ retryTime.waitToRetryNode();
+ }
+ }
+ catch (PrimaryBucketException notPrimary) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Bucket {} on Node {} not primary", notPrimary.getLocalizedMessage(), retryNode);
+ }
+ getRegionAdvisor().notPrimary(bucketIdInt.intValue(), retryNode);
+ retryNode = getOrCreateNodeForBucketRead(bucketIdInt.intValue());
+ }
+
+ // It's possible this is a GemFire thread e.g. ServerConnection
+ // which got to this point because of a distributed system shutdown or
+ // region closure which uses interrupt to break any sleep() or wait()
+ // calls
+ // e.g. waitForPrimary
+ checkShutdown();
+
+ count++;
+ if (count == 1) {
+ this.prStats.incContainsKeyValueOpsRetried();
+ }
+ this.prStats.incContainsKeyValueRetries();
+
+ }
+
+ PartitionedRegionDistributionException e = null; // Fix for bug 36014
+ if (logger.isDebugEnabled()) {
+ e = new PartitionedRegionDistributionException(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS.toLocalizedString(Integer.valueOf(count)));
+ }
+ logger.warn(LocalizedMessage.create(LocalizedStrings.PartitionRegion_NO_VM_AVAILABLE_FOR_GETENTRY_IN_0_ATTEMPTS, Integer.valueOf(count)), e);
+ return null;
+ }
+
+ /**
+ * Check for region closure, region destruction, cache closure as well as
+ * distributed system disconnect. As of 6/21/2007, there were at least four
+ * volatile variables reads and one synchonrization performed upon completion
+ * of this method.
+ */
+ private void checkShutdown() {
+ checkReadiness();
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
+ }
+
+ /**
+ * Checks if a key is contained remotely.
+ *
+ * @param targetNode
+ * the node where bucket region for the key exists.
+ * @param bucketId
+ * the bucket id for the key.
+ * @param key
+ * the key, whose value needs to be checks
+ * @param access
+ * true if caller wants last access time updated
+ * @param allowTombstones whether tombstones should be returned
+ * @throws EntryNotFoundException
+ * if the entry doesn't exist
+ * @throws ForceReattemptException
+ * if the peer is no longer available
+ * @throws PrimaryBucketException
+ * @return true if the passed key is contained remotely.
+ */
+ public EntrySnapshot getEntryRemotely(
+ InternalDistributedMember targetNode,
+ Integer bucketId, Object key, boolean access, boolean allowTombstones)
+ throws EntryNotFoundException, PrimaryBucketException,
+ ForceReattemptException {
+ FetchEntryResponse r = FetchEntryMessage
+ .send(targetNode, this, key, access);
+ this.prStats.incPartitionMessagesSent();
+ EntrySnapshot entry = r.waitForResponse();
+ if (entry != null && entry.getRawValue() == Token.TOMBSTONE){
+ if (!allowTombstones) {
+ return null;
+ }
+ }
+ return entry;
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Following methods would throw, operation Not Supported Exception
+ // /////////////////////////////////////////////////////////////////
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void becomeLockGrantor() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ final public Region createSubregion(String subregionName,
+ RegionAttributes regionAttributes) throws RegionExistsException,
+ TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public Lock getDistributedLock(Object key) throws IllegalStateException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public CacheStatistics getStatistics() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ public Region getSubregion() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public Lock getRegionDistributedLock() throws IllegalStateException {
+
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void loadSnapshot(InputStream inputStream) throws IOException,
+ ClassNotFoundException, CacheWriterException, TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Should it destroy entry from local accessor?????
+ * OVERRIDES
+ */
+ @Override
+ public void localDestroy(Object key, Object aCallbackArgument)
+ throws EntryNotFoundException {
+
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void localInvalidate(Object key, Object aCallbackArgument)
+ throws EntryNotFoundException {
+
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void localInvalidateRegion(Object aCallbackArgument) {
+ getDataView().checkSupportsRegionInvalidate();
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Executes a query on this PartitionedRegion. The restrictions have already
+ * been checked. The query is a SELECT expression, and the only region it
+ * refers to is this region.
+ *
+ * @see DefaultQuery#execute()
+ *
+ * @since 5.1
+ */
+ public Object executeQuery(DefaultQuery query, Object[] parameters,
+ Set buckets) throws FunctionDomainException, TypeMismatchException,
+ NameResolutionException, QueryInvocationTargetException {
+ for (;;) {
+ try {
+ return doExecuteQuery(query, parameters, buckets);
+ } catch (ForceReattemptException fre) {
+ // fall through and loop
+ }
+ }
+ }
+ /**
+ * If ForceReattemptException is thrown then the caller must loop and call us again.
+ * @throws ForceReattemptException if one of the buckets moved out from under us
+ */
+ private Object doExecuteQuery(DefaultQuery query, Object[] parameters,
+ Set buckets)
+ throws FunctionDomainException, TypeMismatchException,
+ NameResolutionException, QueryInvocationTargetException,
+ ForceReattemptException
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing query :{}", query);
+ }
+
+ HashSet<Integer> allBuckets = new HashSet<Integer>();
+
+ if (buckets==null) { // remote buckets
+ final Iterator remoteIter = getRegionAdvisor().getBucketSet().iterator();
+ try {
+ while (remoteIter.hasNext()) {
+ allBuckets.add((Integer)remoteIter.next());
+ }
+ }
+ catch (NoSuchElementException stop) {
+ }
+ }
+ else { // local buckets
+ Iterator localIter = null;
+ if (this.dataStore != null) {
+ localIter = buckets.iterator();
+ }
+ else {
+ localIter = Collections.EMPTY_SET.iterator();
+ }
+ try {
+ while (localIter.hasNext()) {
+ allBuckets.add((Integer)localIter.next());
+ }
+ }
+ catch (NoSuchElementException stop) {
+ }
+ }
+
+ if (allBuckets.size() == 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("No bucket storage allocated. PR has no data yet.");
+ }
+ ResultsSet resSet = new ResultsSet();
+ resSet.setElementType(new ObjectTypeImpl(
+ this.getValueConstraint() == null ? Object.class : this
+ .getValueConstraint()));
+ return resSet;
+ }
+
+ CompiledSelect selectExpr = query.getSimpleSelect();
+ if (selectExpr == null) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.
+ PartitionedRegion_QUERY_MUST_BE_A_SELECT_EXPRESSION_ONLY
+ .toLocalizedString());
+ }
+
+ // this can return a BAG even if it's a DISTINCT select expression,
+ // since the expectation is that the duplicates will be removed at the end
+ SelectResults results = selectExpr
+ .getEmptyResultSet(parameters, getCache(), query);
+
+ PartitionedRegionQueryEvaluator prqe = new PartitionedRegionQueryEvaluator(this.getSystem(), this, query,
+ parameters, results, allBuckets);
+ for (;;) {
+ this.getCancelCriterion().checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ results = prqe.queryBuckets(null);
+ break;
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ }
+ catch (FunctionDomainException e) {
+ throw e;
+ }
+ catch (TypeMismatchException e) {
+ throw e;
+ }
+ catch (NameResolutionException e) {
+ throw e;
+ }
+ catch (QueryInvocationTargetException e) {
+ throw e;
+ }
+ catch (QueryException qe) {
+ throw new QueryInvocationTargetException(LocalizedStrings.PartitionedRegion_UNEXPECTED_QUERY_EXCEPTION_OCCURED_DURING_QUERY_EXECUTION_0.toLocalizedString(qe.getMessage()), qe);
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+
+ // Drop Duplicates if this is a DISTINCT query
+ boolean allowsDuplicates = results.getCollectionType().allowsDuplicates();
+ //Asif: No need to apply the limit to the SelectResults.
+ // We know that even if we do not apply the limit,
+ //the results will satisfy the limit
+ // as it has been evaluated in the iteration of List to
+ // populate the SelectsResuts
+ //So if the results is instance of ResultsBag or is a StructSet or
+ // a ResultsSet, if the limit exists, the data set size will
+ // be exactly matching the limit
+ if (selectExpr.isDistinct()) {
+ // don't just convert to a ResultsSet (or StructSet), since
+ // the bags can convert themselves to a Set more efficiently
+ ObjectType elementType = results.getCollectionType().getElementType();
+ if (selectExpr.getOrderByAttrs() != null) {
+ // Set limit also, its not applied while building the final result set as order by is involved.
+ // results = new ResultsCollectionWrapper(elementType, results.asSet(), query.getLimit(parameters));
+ } else if (allowsDuplicates) {
+ results = new ResultsCollectionWrapper(elementType, results.asSet());
+ }
+ if (selectExpr.isCount() && (results.isEmpty() || selectExpr.isDistinct())) {
+ SelectResults resultCount = new ResultsBag(getCachePerfStats());//Constructor with elementType not visible.
+ resultCount.setElementType(new ObjectTypeImpl(Integer.class));
+ ((ResultsBag)resultCount).addAndGetOccurence(results.size());
+ return resultCount;
+ }
+ }
+ return results;
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void saveSnapshot(OutputStream outputStream) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void writeToDisk() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @since 5.0
+ * @throws UnsupportedOperationException
+ * OVERRIDES
+ */
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ void basicLocalClear(RegionEventImpl event) {
+ throw new UnsupportedOperationException();
+ }
+
+ // /////////////////////////////////////////////////////////////////////
+ // ////////////// Operation Supported for this release
+ // //////////////////////////////
+ // /////////////////////////////////////////////////////////////////////
+
+ @Override
+ boolean virtualPut(EntryEventImpl event,
+ boolean ifNew,
+ boolean ifOld,
+ Object expectedOldValue,
+ boolean requireOldValue,
+ long lastModified,
+ boolean overwriteDestroyed)
+ throws TimeoutException, CacheWriterException {
+ final long startTime = PartitionedRegionStats.startTime();
+ boolean result = false;
+ final DistributedPutAllOperation putAllOp_save = event.setPutAllOperation(null);
+
+ if (event.getEventId() == null) {
+ event.setNewEventId(this.cache.getDistributedSystem());
+ }
+ boolean bucketStorageAssigned = true;
+ try {
+ final Integer bucketId = event.getKeyInfo().getBucketId();
+ assert bucketId != KeyInfo.UNKNOWN_BUCKET;
+ // check in bucket2Node region
+ InternalDistributedMember targetNode = getNodeForBucketWrite(bucketId
+ .intValue(), null);
+ // force all values to be serialized early to make size computation cheap
+ // and to optimize distribution.
+ if (logger.isDebugEnabled()) {
+ logger.debug("PR.virtualPut putting event={}", event);
+ }
+
+ if (targetNode == null) {
+ try {
+ bucketStorageAssigned=false;
+ // if this is a Delta update, then throw exception since the key doesn't
+ // exist if there is no bucket for it yet
+ // For HDFS region, we will recover key, so allow bucket creation
+ if (!this.dataPolicy.withHDFS() && event.hasDelta()) {
+ throw new EntryNotFoundException(LocalizedStrings.
+ PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY
+ .toLocalizedString());
+ }
+ targetNode = createBucket(bucketId.intValue(), event.getNewValSizeForPR(),
+ null);
+ }
+ catch (PartitionedRegionStorageException e) {
+ // try not to throw a PRSE if the cache is closing or this region was
+ // destroyed during createBucket() (bug 36574)
+ this.checkReadiness();
+ if (this.cache.isClosed()) {
+ throw new RegionDestroyedException(toString(), getFullPath());
+ }
+ throw e;
+ }
+ }
+
+ if (event.isBridgeEvent() && bucketStorageAssigned) {
+ setNetworkHop(bucketId, targetNode);
+ }
+ if (putAllOp_save == null) {
+ result = putInBucket(targetNode,
+ bucketId,
+ event,
+ ifNew,
+ ifOld,
+ expectedOldValue,
<TRUNCATED>