You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:35:55 UTC
[33/83] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 92b585a,0000000..f3e730a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@@ -1,4303 -1,0 +1,4311 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.InvalidDeltaException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.CacheWriter;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskAccessException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.LossAction;
+import com.gemstone.gemfire.cache.MembershipAttributes;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.RegionAccessException;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionDistributionException;
+import com.gemstone.gemfire.cache.RegionMembershipListener;
+import com.gemstone.gemfire.cache.ResumptionAction;
+import com.gemstone.gemfire.cache.RoleException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.persistence.PersistentReplicatesOfflineException;
+import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
+import com.gemstone.gemfire.distributed.Role;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.ProfileVisitor;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.MembershipListener;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
+import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
+import com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
+import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
+import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultWaiter;
+import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
+import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.persistence.CreatePersistentRegionProcessor;
+import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisor;
+import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisorImpl;
+import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
+import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberManager;
+import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberView;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.Chunk;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
+import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
+import com.gemstone.gemfire.i18n.StringId;
+/**
+ *
+ * @author Eric Zoerner
+ * @author Sudhir Menon
+ */
+@SuppressWarnings("deprecation")
+public class DistributedRegion extends LocalRegion implements
+ CacheDistributionAdvisee
+{
+ private static final Logger logger = LogService.getLogger();
+
+ /** causes cache profile to be added to afterRemoteRegionCreate notification for testing */
+ public static boolean TEST_HOOK_ADD_PROFILE = false;
+
+ /** Used to sync accesses to this.dlockService to allow lazy construction */
+ private final Object dlockMonitor = new Object();
+
+ final CacheDistributionAdvisor distAdvisor;
+
+ /**
+ * @guarded.By {@link #dlockMonitor}
+ */
+ private DistributedLockService dlockService;
+
+ protected final AdvisorListener advisorListener = new AdvisorListener();
+
+ /** Set of currently missing required roles */
+ protected final HashSet missingRequiredRoles = new HashSet();
+
+ /** True if this region is currently missing any required roles */
+ protected volatile boolean isMissingRequiredRoles = false;
+
+ /**
+ * True if this region is has any required roles defined and the LossAction is
+ * either NO_ACCESS or LIMITED_ACCESS. Reliability checks will only happen if
+ * this is true.
+ */
+ private final boolean requiresReliabilityCheck;
+
+ /**
+ * Provides a queue for reliable message delivery
+ *
+ * @since 5.0
+ */
+ protected final ReliableMessageQueue rmq;
+
+ /**
+ * Latch that is opened after initialization waits for required roles up to
+ * the <a href="DistributedSystem#member-timeout">member-timeout </a>.
+ */
+ protected final StoppableCountDownLatch initializationLatchAfterMemberTimeout;
+
+ private final PersistenceAdvisor persistenceAdvisor;
+
+ private final PersistentMemberID persistentId;
+
+ /**
+ * This boolean is set to false when this region
+ * is non-persistent, but there are persistent members in the distributed system
+ * to which all region modifications should be forwarded
+ * see bug 45186
+ */
+ private volatile boolean generateVersionTag = true;
+
+ /** Tests can set this to true and ignore reliability triggered reconnects */
+ public static boolean ignoreReconnect = false;
+
+ /**
+ * Lock to prevent multiple threads on this member from performing
+ * a clear at the same time.
+ */
+ private final Object clearLock = new Object();
+
+ private static AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
+
+ /** Creates a new instance of DistributedRegion */
+ protected DistributedRegion(String regionName, RegionAttributes attrs,
+ LocalRegion parentRegion, GemFireCacheImpl cache,
+ InternalRegionArguments internalRegionArgs) {
+ super(regionName, attrs, parentRegion, cache, internalRegionArgs);
+ this.initializationLatchAfterMemberTimeout = new StoppableCountDownLatch(
+ getCancelCriterion(), 1);
+ this.distAdvisor = createDistributionAdvisor(internalRegionArgs);
+
+ if (getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
+ && !isInternalRegion() && !attrs.getScope().isAck() && !doesNotDistribute() && attrs.getDataPolicy().withStorage()) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.DistributedRegion_REGION_0_1_SPLITBRAIN_CONFIG_WARNING,
+ new Object[] { regionName, attrs.getScope() }));
+ }
+ if (!getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
+ && attrs.getDataPolicy().withPersistence() && !loggedNetworkPartitionWarning.getAndSet(true)) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.DistributedRegion_REGION_0_ENABLE_NETWORK_PARTITION_WARNING,
+ new Object[] { regionName, attrs.getScope() }));
+ }
+
+ boolean setRequiresReliabilityCheck = attrs.getMembershipAttributes()
+ .hasRequiredRoles()
+ &&
+ // note that the following includes NO_ACCESS, LIMITED_ACCESS,
+ !attrs.getMembershipAttributes().getLossAction().isAllAccess()
+ && !attrs.getMembershipAttributes().getLossAction().isReconnect();
+
+ // this optimization is safe for as long as Roles and Required Roles are
+ // immutable
+ // if this VM fulfills all required roles, make requiresReliabilityCheck
+ // false
+ Set reqRoles = new HashSet(attrs.getMembershipAttributes()
+ .getRequiredRoles());
+ reqRoles.removeAll(getSystem().getDistributedMember().getRoles());
+ if (reqRoles.isEmpty()) {
+ setRequiresReliabilityCheck = false;
+ }
+
+ this.requiresReliabilityCheck = setRequiresReliabilityCheck;
+
+ {
+ ReliableMessageQueue tmp = null;
+ if (this.requiresReliabilityCheck) {
+ // if
+ // (attrs.getMembershipAttributes().getLossAction().isAllAccessWithQueuing())
+ // {
+ // tmp = cache.getReliableMessageQueueFactory().create(this);
+ // }
+ }
+ this.rmq = tmp;
+ }
+
+ if(internalRegionArgs.isUsedForPartitionedRegionBucket()) {
+ this.persistenceAdvisor = internalRegionArgs.getPersistenceAdvisor();
+ } else if (this.allowsPersistence()){
+ //TODO prpersist - using this lock service is a hack. Maybe? Or maybe
+ //it's ok if we have one (rarely used) lock service for many operations?
+ //What does the resource manager do?
+ DistributedLockService dl = cache.getPartitionedRegionLockService();
+ try {
+ //TODO prpersist - this is just a quick and dirty storage mechanism so that
+ //I can test the storage.
+ DiskRegionStats diskStats;
+ PersistentMemberView storage;
+ if(getDataPolicy().withPersistence()) {
+ storage = getDiskRegion();
+ diskStats = getDiskRegion().getStats();
+ } else {
+ storage = new InMemoryPersistentMemberView();
+ diskStats = null;
+ }
+ PersistentMemberManager memberManager = cache.getPersistentMemberManager();
+ this.persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage, this.getFullPath(), diskStats, memberManager);
+ } catch (Exception e) {
+ throw new InternalGemFireError("Couldn't recover persistence");
+ }
+ } else {
+ this.persistenceAdvisor = null;
+ }
+ if(this.persistenceAdvisor != null) {
+ this.persistentId = persistenceAdvisor.generatePersistentID();
+ } else {
+ this.persistentId = null;
+ }
+
+ }
+
+ @Override
+ public void createEventTracker() {
+ this.eventTracker = new EventTracker(this);
+ this.eventTracker.start();
+ }
+
+ /**
+ * Intended for used during construction of a DistributedRegion
+ *
+ * @return the advisor to be used by the region
+ */
+ protected CacheDistributionAdvisor createDistributionAdvisor(InternalRegionArguments internalRegionArgs) {
+ return CacheDistributionAdvisor.createCacheDistributionAdvisor(this); // Warning: potential early escape of object before full construction
+ }
+
+ /**
+ * Does this region support persistence?
+ */
+ public boolean allowsPersistence() {
+ return true;
+ }
+
+ @Override
+ public boolean requiresOneHopForMissingEntry(EntryEventImpl event) {
+ // received from another member - don't use one-hop
+ if (event.isOriginRemote()) {
+ return false;
+ }
+ // local ops aren't distributed
+ if (event.getOperation().isLocal()) {
+ return false;
+ }
+ // if it already has a valid version tag it can go out with a DistributedCacheOperation
+ if (event.getVersionTag() != null && event.getVersionTag().getRegionVersion() > 0) {
+ return false;
+ }
+ // if we're not allowed to generate a version tag we need to send it to someone who can
+ if (!this.generateVersionTag) {
+ return true;
+ }
+ return this.concurrencyChecksEnabled &&
+ (this.srp == null) &&
+ !isTX() &&
+ this.scope.isDistributed() &&
+ !this.dataPolicy.withReplication();
+ }
+
+
+ /**
+ * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object,
+ * boolean, long, boolean)
+ */
+ @Override
+ protected
+ boolean virtualPut(EntryEventImpl event,
+ boolean ifNew,
+ boolean ifOld,
+ Object expectedOldValue,
+ boolean requireOldValue,
+ long lastModified,
+ boolean overwriteDestroyed)
+ throws TimeoutException,
+ CacheWriterException {
+ final boolean isTraceEnabled = logger.isTraceEnabled();
+
+ Lock dlock = null;
+ if (this.scope.isGlobal() && // lock only applies to global scope
+ !event.isOriginRemote() && // only if operation originating locally
+ !event.isNetSearch() && // search and load processor handles own locking
+ !event.isNetLoad() &&
+ // @todo darrel/kirk: what about putAll?
+ !event.isLocalLoad() &&
+ !event.isSingleHopPutOp()) { // Single Hop Op means dlock is already taken at origin node.
+ dlock = this.getDistributedLockIfGlobal(event.getKey());
+ }
+ if (isTraceEnabled) {
+ logger.trace("virtualPut invoked for event {}", event);
+ }
+ try {
+ if (!hasSeenEvent(event)) {
+ if (this.requiresOneHopForMissingEntry(event)) {
+ // bug #45704: see if a one-hop must be done for this operation
+ RegionEntry re = getRegionEntry(event.getKey());
+ if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
+ if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
+ // putAll will send a single one-hop for empty regions. for other missing entries
+ // we need to get a valid version number before modifying the local cache
+ boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
+ false, false, expectedOldValue, requireOldValue, !this.generateVersionTag);
+
+ if (!didDistribute && isTraceEnabled) {
+ logger.trace("Unable to perform one-hop messaging");
+ }
+ if (!this.generateVersionTag && !didDistribute) {
+ throw new PersistentReplicatesOfflineException();
+ }
+ if (didDistribute) {
+ if (isTraceEnabled) {
+ logger.trace("Event after remotePut operation: {}", event);
+ }
+ if (event.getVersionTag() == null) {
+ // if the event wasn't applied by the one-hop replicate it will not have a version tag
+ // and so should not be applied to this cache
+ return false;
+ }
+ }
+ }
+ }
+ }
+ return super.virtualPut(event,
+ ifNew,
+ ifOld,
+ expectedOldValue,
+ requireOldValue,
+ lastModified,
+ overwriteDestroyed);
+ }
+ else {
+ if (event.getDeltaBytes() != null && event.getRawNewValue() == null) {
+ // This means that this event has delta bytes but no full value.
+ // Request the full value of this event.
+ // The value in this vm may not be same as this event's value.
+ throw new InvalidDeltaException(
+ "Cache encountered replay of event containing delta bytes for key "
+ + event.getKey());
+ }
+ // if the listeners have already seen this event, then it has already
+ // been successfully applied to the cache. Distributed messages and
+ // return
+ if (isTraceEnabled) {
+ logger.trace("DR.virtualPut: this cache has already seen this event {}", event);
+ }
+
+ // Gester, Fix 39014: when hasSeenEvent, put will still distribute
+ // event, but putAll did not. We add the logic back here, not to put
+ // back into DR.distributeUpdate() because we moved this part up into
+ // LR.basicPutPart3 in purpose. Reviewed by Bruce.
+ if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
+ event.getPutAllOperation().addEntry(event, true);
+ }
+
+ /* doing this so that other VMs will apply this no matter what. If it
+ * is an "update" they will not apply it if they don't have the key.
+ * Because this is probably a retry, it will never get applied to this
+ * local AbstractRegionMap, and so will never be flipped to a 'create'
+ */
+ event.makeCreate();
- distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
- event.invokeCallbacks(this,true, true);
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
++ event.invokeCallbacks(this,true, true);
++ }
+ return true;
+ }
+ }
+ finally {
+ if (dlock != null) {
+ dlock.unlock();
+ }
+ }
+ }
+
+ @Override
+ protected RegionEntry basicPutEntry(EntryEventImpl event, long lastModified)
+ throws TimeoutException, CacheWriterException {
+
+ final boolean isTraceEnabled = logger.isTraceEnabled();
+
+ if (isTraceEnabled) {
+ logger.trace("basicPutEntry invoked for event {}", event);
+ }
+ if (this.requiresOneHopForMissingEntry(event)) {
+ // bug #45704: see if a one-hop must be done for this operation
+ RegionEntry re = getRegionEntry(event.getKey());
+ if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
+ final boolean ifNew = false;
+ final boolean ifOld = false;
+ boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
+ ifNew, ifOld, null, false, !this.generateVersionTag);
+ if (!this.generateVersionTag && !didDistribute) {
+ throw new PersistentReplicatesOfflineException();
+ }
+ if (didDistribute && isTraceEnabled) {
+ logger.trace("Event after remotePut for basicPutEntry: {}", event);
+ }
+ }
+ }
+ return super.basicPutEntry(event, lastModified);
+ }
+
+ @Override
+ public void performPutAllEntry(EntryEventImpl event) {
+ /*
+ * force shared data view so that we just do the virtual op, accruing things in the put all operation for later
+ */
+ if(isTX()) {
+ event.getPutAllOperation().addEntry(event);
+ } else {
+ getSharedDataView().putEntry(event, false, false, null, false, 0L, false);
+ }
+ }
+
+ @Override
+ public void performRemoveAllEntry(EntryEventImpl event) {
+ // force shared data view so that we just do the virtual op, accruing things in the bulk operation for later
+ if(isTX()) {
+ event.getRemoveAllOperation().addEntry(event);
+ } else {
+ basicDestroy(event, true, null);
+ //getSharedDataView().destroyExistingEntry(event, true, null);
+ }
+ }
+
+ /**
+ * distribution and listener notification
+ */
+ @Override
+ public void basicPutPart3(EntryEventImpl event, RegionEntry entry,
+ boolean isInitialized, long lastModified, boolean invokeCallbacks,
+ boolean ifNew, boolean ifOld, Object expectedOldValue,
+ boolean requireOldValue) {
+
+ distributeUpdate(event, lastModified, false, false, null, false);
+ super.basicPutPart3(event, entry, isInitialized, lastModified,
+ invokeCallbacks, ifNew, ifOld, expectedOldValue, requireOldValue);
+ }
+
+ /** distribute an update operation */
+ protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
+ // an update from a netSearch is not distributed
+ if (!event.isOriginRemote() && !event.isNetSearch() && !event.isBulkOpInProgress()) {
+ boolean distribute = true;
+ if (event.getInhibitDistribution()) {
+ // this has already been distributed by a one-hop operation
+ distribute = false;
+ }
+ if (distribute) {
+ UpdateOperation op = new UpdateOperation(event, lastModified);
+ if (logger.isTraceEnabled()) {
+ logger.trace("distributing operation for event : {} : for region : {}", event, this.getName());
+ }
+ op.distribute();
+ }
+ }
+ }
+
+ protected void setGeneratedVersionTag(boolean generateVersionTag) {
+ // there is at-least one other persistent member, so turn on concurrencyChecks
+ enableConcurrencyChecks();
+
+ this.generateVersionTag = generateVersionTag;
+ }
+
+ protected boolean getGenerateVersionTag() {
+ return this.generateVersionTag;
+ }
+
+ @Override
+ protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("shouldGenerateVersionTag this.generateVersionTag={} ccenabled={} dataPolicy={} event:{}",
+ this.generateVersionTag, this.concurrencyChecksEnabled, this.dataPolicy, event);
+ }
+ if (!this.concurrencyChecksEnabled || this.dataPolicy == DataPolicy.EMPTY || !this.generateVersionTag) {
+ return false;
+ }
+ if (this.srp != null) { // client
+ return false;
+ }
+ if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
+ return false;
+ }
+ if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag
+ return false;
+ }
+ if (!event.isOriginRemote() && this.dataPolicy.withReplication()) {
+ return true;
+ }
+ if (!this.dataPolicy.withReplication() && !this.dataPolicy.withPersistence()) {
+ if (!entry.getVersionStamp().hasValidVersion()) {
+ // do not generate a version stamp in a region that has no replication if it's not based
+ // on an existing version from a replicate region
+ return false;
+ }
+ return true;
+ }
+ if (!event.isOriginRemote() && event.getDistributedMember() != null) {
+ if (!event.getDistributedMember().equals(this.getMyId())) {
+ return event.getVersionTag() == null; // one-hop remote message
+ }
+ }
+ return false;
+ }
+ /**
+ * Throws RegionAccessException if required roles are missing and the
+ * LossAction is NO_ACCESS
+ *
+ * @throws RegionAccessException
+ * if required roles are missing and the LossAction is NO_ACCESS
+ */
+ @Override
+ protected void checkForNoAccess()
+ {
+ if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
+ if (getMembershipAttributes().getLossAction().isNoAccess()) {
+ synchronized (this.missingRequiredRoles) {
+ if (!this.isMissingRequiredRoles)
+ return;
+ Set roles = Collections.unmodifiableSet(new HashSet(
+ this.missingRequiredRoles));
+ throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1.toLocalizedString(new Object[] {getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
+ }
+ }
+ }
+ }
+
+ /**
+ * Throws RegionAccessException is required roles are missing and the
+ * LossAction is either NO_ACCESS or LIMITED_ACCESS.
+ *
+ * @throws RegionAccessException
+ * if required roles are missing and the LossAction is either
+ * NO_ACCESS or LIMITED_ACCESS
+ */
+ @Override
+ protected void checkForLimitedOrNoAccess()
+ {
+ if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
+ if (getMembershipAttributes().getLossAction().isNoAccess()
+ || getMembershipAttributes().getLossAction().isLimitedAccess()) {
+ synchronized (this.missingRequiredRoles) {
+ if (!this.isMissingRequiredRoles)
+ return;
+ Set roles = Collections.unmodifiableSet(new HashSet(
+ this.missingRequiredRoles));
+ Assert.assertTrue(!roles.isEmpty());
+ throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1
+ .toLocalizedString(new Object[] { getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void handleReliableDistribution(ReliableDistributionData data,
+ Set successfulRecipients) {
+ handleReliableDistribution(data, successfulRecipients,
+ Collections.EMPTY_SET, Collections.EMPTY_SET);
+ }
+
+ protected void handleReliableDistribution(ReliableDistributionData data,
+ Set successfulRecipients, Set otherRecipients1, Set otherRecipients2)
+ {
+ if (this.requiresReliabilityCheck) {
+ MembershipAttributes ra = getMembershipAttributes();
+ Set recipients = successfulRecipients;
+ // determine the successful roles
+ Set roles = new HashSet();
+ for (Iterator iter = recipients.iterator(); iter.hasNext();) {
+ InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
+ if (mbr != null) {
+ roles.addAll(mbr.getRoles());
+ }
+ }
+ for (Iterator iter = otherRecipients1.iterator(); iter.hasNext();) {
+ InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
+ if (mbr != null) {
+ roles.addAll(mbr.getRoles());
+ }
+ }
+ for (Iterator iter = otherRecipients2.iterator(); iter.hasNext();) {
+ InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
+ if (mbr != null) {
+ roles.addAll(mbr.getRoles());
+ }
+ }
+ // determine the missing roles
+ Set failedRoles = new HashSet(ra.getRequiredRoles());
+ failedRoles.removeAll(roles);
+ if (failedRoles.isEmpty())
+ return;
+// if (rp.isAllAccessWithQueuing()) {
+// this.rmq.add(data, failedRoles);
+// } else {
+
+ throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_MAY_HAVE_FAILED_TO_NOTIFY_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
+// }
+ }
+ }
+
+ /**
+ *
+ * Called when we do a distributed operation and don't have anyone to
+ * distributed it too. Since this is only called when no distribution was done
+ * (i.e. no recipients) we do not check isMissingRequiredRoles because it
+ * might not longer be true due to race conditions
+ *
+ * @return false if this region has at least one required role and queuing is
+ * configured. Returns true if sending to no one is ok.
+ * @throws RoleException
+ * if a required role is missing and the LossAction is either
+ * NO_ACCESS or LIMITED_ACCESS.
+ * @since 5.0
+ */
+ protected boolean isNoDistributionOk()
+ {
+ if (this.requiresReliabilityCheck) {
+ MembershipAttributes ra = getMembershipAttributes();
+ // if (ra.getLossAction().isAllAccessWithQueuing()) {
+ // return !ra.hasRequiredRoles();
+ // } else {
+ Set failedRoles = ra.getRequiredRoles();
+ throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_WAS_NOT_DONE_TO_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
+ // }
+ }
+ return true;
+ }
+
+ /**
+ * returns true if this Region does not distribute its operations to other
+ * members.
+ * @since 6.0
+ * @see HARegion#localDestroyNoCallbacks(Object)
+ */
+ public boolean doesNotDistribute() {
+ return false;
+ }
+
+
+ @Override
+ public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
+ return !doesNotDistribute() && super.shouldSyncForCrashedMember(id);
+ }
+
+
+ /**
+ * Adjust the specified set of recipients by removing any of them that are
+ * currently having their data queued.
+ *
+ * @param recipients
+ * the set of recipients that a message is to be distributed too.
+ * Recipients that are currently having their data queued will be
+ * removed from this set.
+ * @return the set, possibly null, of recipients that are currently having
+ * their data queued.
+ * @since 5.0
+ */
+ protected Set adjustForQueuing(Set recipients)
+ {
+ Set result = null;
+ // if (this.requiresReliabilityCheck) {
+ // MembershipAttributes ra = getMembershipAttributes();
+ // if (ra.getLossAction().isAllAccessWithQueuing()) {
+ // Set currentQueuedRoles = this.rmq.getQueuingRoles();
+ // if (currentQueuedRoles != null) {
+ // // foreach recipient see if any of his roles are queued and if
+ // // they are remove him from recipients and add him to result
+ // Iterator it = recipients.iterator();
+ // while (it.hasNext()) {
+ // DistributedMember dm = (DistributedMember)it.next();
+ // Set dmRoles = dm.getRoles();
+ // if (!dmRoles.isEmpty()) {
+ // if (intersects(dmRoles, currentQueuedRoles)) {
+ // it.remove(); // fix for bug 34447
+ // if (result == null) {
+ // result = new HashSet();
+ // }
+ // result.add(dm);
+ // }
+ // }
+ // }
+ // }
+ // }
+ // }
+ return result;
+ }
+
+ /**
+ * Returns true if the two sets intersect
+ *
+ * @param a
+ * a non-null non-empty set
+ * @param b
+ * a non-null non-empty set
+ * @return true if sets a and b intersect; false if not
+ * @since 5.0
+ */
+ public static boolean intersects(Set a, Set b)
+ {
+ Iterator it;
+ Set target;
+ if (a.size() <= b.size()) {
+ it = a.iterator();
+ target = b;
+ }
+ else {
+ it = b.iterator();
+ target = a;
+ }
+ while (it.hasNext()) {
+ if (target.contains(it.next()))
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean requiresReliabilityCheck()
+ {
+ return this.requiresReliabilityCheck;
+ }
+
+ /**
+ * Returns true if the ExpiryTask is currently allowed to expire.
+ * <p>
+ * If the region is in NO_ACCESS due to reliability configuration, then no
+ * expiration actions are allowed.
+ * <p>
+ * If the region is in LIMITED_ACCESS due to reliability configuration, then
+ * only non-distributed expiration actions are allowed.
+ */
+ @Override
+ protected boolean isExpirationAllowed(ExpiryTask expiry)
+ {
+ if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
+ if (getMembershipAttributes().getLossAction().isNoAccess()) {
+ return false;
+ }
+ if (getMembershipAttributes().getLossAction().isLimitedAccess()
+ && expiry.isDistributedAction()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Performs the resumption action when reliability is resumed.
+ *
+ * @return true if asynchronous resumption is triggered
+ */
+ protected boolean resumeReliability(InternalDistributedMember id,
+ Set newlyAcquiredRoles)
+ {
+ boolean async = false;
+ try {
+ ResumptionAction ra = getMembershipAttributes().getResumptionAction();
+ if (ra.isNone()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reliability resumption for action of none");
+ }
+ resumeExpiration();
+ }
+ else if (ra.isReinitialize()) {
+ async = true;
+ asyncResumeReliability(id, newlyAcquiredRoles);
+ }
+ }
+ catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+ }
+ return async;
+ }
+
+ /**
+ * Handles asynchronous ResumptionActions such as region reinitialize.
+ */
+ private void asyncResumeReliability(final InternalDistributedMember id,
+ final Set newlyAcquiredRoles)
+ throws RejectedExecutionException {
+ final ResumptionAction ra = getMembershipAttributes().getResumptionAction();
+ getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ public void run()
+ {
+ try {
+ if (ra.isReinitialize()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reliability resumption for action of reinitialize");
+ }
+ if (!isDestroyed() && !cache.isClosed()) {
+ RegionEventImpl event = new RegionEventImpl(
+ DistributedRegion.this, Operation.REGION_REINITIALIZE, null,
+ false, getMyId(), generateEventID());
+ reinitialize(null, event);
+ }
+ synchronized (missingRequiredRoles) {
+ // any number of threads may be waiting on missingRequiredRoles
+ missingRequiredRoles.notifyAll();
+ if (hasListener() && id != null) {
+ // fire afterRoleGain event
+ RoleEventImpl relEvent = new RoleEventImpl(
+ DistributedRegion.this, Operation.REGION_CREATE, null,
+ true, id, newlyAcquiredRoles);
+ dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN,
+ relEvent);
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+ }
+ }
+ });
+ }
+
+ /** Reschedules expiry tasks when reliability is resumed. */
+ private void resumeExpiration()
+ {
+ boolean isNoAccess = getMembershipAttributes().getLossAction().isNoAccess();
+ boolean isLimitedAccess = getMembershipAttributes().getLossAction()
+ .isLimitedAccess();
+ if (!(isNoAccess || isLimitedAccess)) {
+ return; // early out: expiration was never affected by reliability
+ }
+
+ if (getEntryTimeToLive().getTimeout() > 0
+ && (isNoAccess || (isLimitedAccess && getEntryTimeToLive().getAction()
+ .isDistributed()))) {
+ rescheduleEntryExpiryTasks();
+ }
+ else
+ if (getEntryIdleTimeout().getTimeout() > 0
+ && (isNoAccess || (isLimitedAccess && getEntryIdleTimeout().getAction()
+ .isDistributed()))) {
+ rescheduleEntryExpiryTasks();
+ }
+ else
+ if (getCustomEntryTimeToLive() != null || getCustomEntryIdleTimeout() != null) {
+ // Force all entries to be rescheduled
+ rescheduleEntryExpiryTasks();
+ }
+
+ if (getRegionTimeToLive().getTimeout() > 0
+ && (isNoAccess || (isLimitedAccess && getRegionTimeToLive().getAction()
+ .isDistributed()))) {
+ addTTLExpiryTask();
+ }
+ if (getRegionIdleTimeout().getTimeout() > 0
+ && (isNoAccess || (isLimitedAccess && getRegionIdleTimeout()
+ .getAction().isDistributed()))) {
+ addIdleExpiryTask();
+ }
+ }
+
+ /**
+ * A boolean used to indicate if its the intialization time i.e the
+ * distributed Region is created for the first time. The variable is used at
+ * the time of lost reliablility.
+ */
+ private boolean isInitializingThread = false;
+
+ /**
+ * Called when reliability is lost. If MembershipAttributes are configured
+ * with {@link LossAction#RECONNECT}then DistributedSystem reconnect will be
+ * called asynchronously.
+ *
+ * @return true if asynchronous resumption is triggered
+ */
+ protected boolean lostReliability(final InternalDistributedMember id,
+ final Set newlyMissingRoles)
+ {
+ if (DistributedRegion.ignoreReconnect)
+ return false;
+ boolean async = false;
+ try {
+ if (getMembershipAttributes().getLossAction().isReconnect()) {
+ async = true;
+ if (isInitializingThread) {
+ doLostReliability(true, id, newlyMissingRoles);
+ }
+ else {
+ doLostReliability(false, id, newlyMissingRoles);
+ }
+ // we don't do this in the waiting pool because we're going to
+ // disconnect
+ // the distributed system, and it will wait for the pool to empty
+ /*
+ * moved to a new method called doLostReliablity. Thread t = new
+ * Thread("Reconnect Distributed System") { public void run() { try { //
+ * TODO: may need to check isReconnecting and checkReadiness...
+ * initializationLatchAfterMemberTimeout.await(); // TODO:
+ * call reconnect here
+ * getSystem().tryReconnect((GemFireCache)getCache()); // added for
+ * reconnect. synchronized (missingRequiredRoles) { // any number of
+ * threads may be waiting on missingRequiredRoles
+ * missingRequiredRoles.notifyAll(); // need to fire an event if id is
+ * not null if (hasListener() && id != null) { RoleEventImpl relEvent =
+ * new RoleEventImpl( DistributedRegion.this, Operation.CACHE_RECONNECT,
+ * null, true, id, newlyMissingRoles); dispatchListenerEvent(
+ * EnumListenerEvent.AFTER_ROLE_LOSS, relEvent); } } } catch (Exception
+ * e) { } } };
+ * t.setDaemon(true); t.start();
+ */
+ }
+ }
+ catch (CancelException cce) {
+ throw cce;
+ }
+ catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+ }
+ return async;
+ }
+
+ private void doLostReliability(boolean isInitializing,
+ final InternalDistributedMember id, final Set newlyMissingRoles)
+ {
+ try {
+ if (!isInitializing) {
+ // moved code to a new thread.
+ Thread t = new Thread(LocalizedStrings.DistributedRegion_RECONNECT_DISTRIBUTED_SYSTEM.toLocalizedString()) {
+ @Override
+ public void run()
+ {
+ try {
+ // TODO: may need to check isReconnecting and checkReadiness...
+ if (logger.isDebugEnabled()) {
+ logger.debug("Reliability loss with policy of reconnect and membership thread doing reconnect");
+ }
+ initializationLatchAfterMemberTimeout.await();
+ getSystem().tryReconnect(false, "Role Loss", getCache());
+ synchronized (missingRequiredRoles) {
+ // any number of threads may be waiting on missingRequiredRoles
+ missingRequiredRoles.notifyAll();
+ // need to fire an event if id is not null
+ if (hasListener() && id != null) {
+ RoleEventImpl relEvent = new RoleEventImpl(
+ DistributedRegion.this, Operation.CACHE_RECONNECT, null,
+ true, id, newlyMissingRoles);
+ dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS,
+ relEvent);
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+
+ }
+ else {
+ getSystem().tryReconnect(false, "Role Loss", getCache()); // added for
+ // reconnect.
+ synchronized (missingRequiredRoles) {
+ // any number of threads may be waiting on missingRequiredRoles
+ missingRequiredRoles.notifyAll();
+ // need to fire an event if id is not null
+ if (hasListener() && id != null) {
+ RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
+ Operation.CACHE_RECONNECT, null, true, id, newlyMissingRoles);
+ dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
+ }
+ }
+ // } catch (CancelException cce){
+
+ // }
+
+ }
+ }
+ catch (CancelException ignor) {
+ throw ignor;
+ }
+ catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+ }
+
+ }
+
+ protected void lockCheckReadiness()
+ {
+ // fix for bug 32610
+ cache.getCancelCriterion().checkCancelInProgress(null);
+ checkReadiness();
+ }
+
+ @Override
+ public final Object validatedDestroy(Object key, EntryEventImpl event)
+ throws TimeoutException, EntryNotFoundException, CacheWriterException {
+ Lock dlock = this.getDistributedLockIfGlobal(key);
+ try {
+ return super.validatedDestroy(key, event);
+ } finally {
+ if (dlock != null) {
+ dlock.unlock();
+ }
+ }
+ }
+
+ /**
+ * @see LocalRegion#localDestroyNoCallbacks(Object)
+ */
+ @Override
+ public void localDestroyNoCallbacks(Object key)
+ {
+ super.localDestroyNoCallbacks(key);
+ if (getScope().isGlobal()) {
+ try {
+ this.getLockService().freeResources(key);
+ }
+ catch (LockServiceDestroyedException ignore) {
+ }
+ }
+ }
+
+ /**
+ * @see LocalRegion#localDestroy(Object, Object)
+ */
+ @Override
+ public void localDestroy(Object key, Object aCallbackArgument)
+ throws EntryNotFoundException
+ {
+ super.localDestroy(key, aCallbackArgument);
+ if (getScope().isGlobal()) {
+ try {
+ this.getLockService().freeResources(key);
+ }
+ catch (LockServiceDestroyedException ignore) {
+ }
+ }
+ }
+
+ /**
+ * @see LocalRegion#invalidate(Object, Object)
+ */
+ @Override
+ public void invalidate(Object key, Object aCallbackArgument)
+ throws TimeoutException, EntryNotFoundException
+ {
+ validateKey(key);
+ validateCallbackArg(aCallbackArgument);
+ checkReadiness();
+ checkForLimitedOrNoAccess();
+ Lock dlock = this.getDistributedLockIfGlobal(key);
+ try {
+ super.validatedInvalidate(key, aCallbackArgument);
+ }
+ finally {
+ if (dlock != null)
+ dlock.unlock();
+ }
+ }
+
+ @Override
+ public Lock getRegionDistributedLock() throws IllegalStateException
+ {
+ lockCheckReadiness();
+ checkForLimitedOrNoAccess();
+ if (!this.scope.isGlobal()) {
+ throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
+ }
+ return new RegionDistributedLock();
+ }
+
+ @Override
+ public Lock getDistributedLock(Object key) throws IllegalStateException
+ {
+ validateKey(key);
+ lockCheckReadiness();
+ checkForLimitedOrNoAccess();
+ if (!this.scope.isGlobal()) {
+ throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
+ }
+ if (isLockingSuspendedByCurrentThread()) {
+ throw new IllegalStateException(LocalizedStrings.DistributedRegion_THIS_THREAD_HAS_SUSPENDED_ALL_LOCKING_FOR_THIS_REGION.toLocalizedString());
+ }
+ return new DistributedLock(key);
+ }
+
+ /**
+ * Called while NOT holding lock on parent's subregions
+ *
+ * @throws IllegalStateException
+ * if region is not compatible with a region in another VM.
+ *
+ * @see LocalRegion#initialize(InputStream, InternalDistributedMember, InternalRegionArguments)
+ */
+ @Override
+ protected void initialize(InputStream snapshotInputStream,
+ InternalDistributedMember imageTarget, InternalRegionArguments internalRegionArgs) throws TimeoutException,
+ IOException, ClassNotFoundException
+ {
+ Assert.assertTrue(!isInitialized());
+ if (logger.isDebugEnabled()) {
+ logger.debug("DistributedRegion.initialize BEGIN: {}", getFullPath());
+ }
+
+ // if we're versioning entries we need a region-level version vector
+ if (this.scope.isDistributed() && this.concurrencyChecksEnabled) {
+ createVersionVector();
+ }
+
+ if (this.scope.isGlobal()) {
+ getLockService(); // create lock service eagerly now
+ }
+
+ final IndexUpdater indexUpdater = getIndexUpdater();
+ boolean sqlfGIILockTaken = false;
+ // this try block is to release the SQLF GII lock in finally
+ // which should be done after bucket status will be set
+ // properly in LocalRegion#initialize()
+ try {
+ try {
+ try {
+ // take the GII lock to avoid missing entries while updating the
+ // index list for SQLFabric (#41330 and others)
+ if (indexUpdater != null) {
+ indexUpdater.lockForGII();
+ sqlfGIILockTaken = true;
+ }
+
+ PersistentMemberID persistentId = null;
+ boolean recoverFromDisk = isRecoveryNeeded();
+ DiskRegion dskRgn = getDiskRegion();
+ if (recoverFromDisk) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("DistributedRegion.getInitialImageAndRecovery: Starting Recovery");
+ }
+ dskRgn.initializeOwner(this); // do recovery
+ if (logger.isDebugEnabled()) {
+ logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery");
+ }
+ persistentId = dskRgn.getMyPersistentID();
+ }
+
+ // Create OQL indexes before starting GII.
+ createOQLIndexes(internalRegionArgs, recoverFromDisk);
+
+ if (getDataPolicy().withReplication()
+ || getDataPolicy().withPreloaded()) {
+ getInitialImageAndRecovery(snapshotInputStream, imageTarget,
+ internalRegionArgs, recoverFromDisk, persistentId);
+ }
+ else {
+ new CreateRegionProcessor(this).initializeRegion();
+ if (snapshotInputStream != null) {
+ releaseBeforeGetInitialImageLatch();
+ loadSnapshotDuringInitialization(snapshotInputStream);
+ }
+ }
+ }
+ catch (DiskAccessException dae) {
+ this.handleDiskAccessException(dae, true);
+ throw dae;
+ }
+
+ initMembershipRoles();
+ isInitializingThread = false;
+ super.initialize(null, null, null); // makes sure all latches are released if they haven't been already
+ } finally {
+ if (this.eventTracker != null) {
+ this.eventTracker.setInitialized();
+ }
+ }
+ } finally {
+ if (sqlfGIILockTaken) {
+ indexUpdater.unlockForGII();
+ }
+ }
+ }
+
+ @Override
+ public void initialized() {
+ new UpdateAttributesProcessor(this).distribute(false);
+ }
+
+ /** True if GII was impacted by missing required roles */
+ private boolean giiMissingRequiredRoles = false;
+
+ /**
+ * A reference counter to protected the memoryThresholdReached boolean
+ */
+ private final Set<DistributedMember> memoryThresholdReachedMembers =
+ new CopyOnWriteArraySet<DistributedMember>();
+
+ private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
+ /** Sets and returns giiMissingRequiredRoles */
+ private boolean checkInitialImageForReliability(
+ InternalDistributedMember imageTarget,
+ CacheDistributionAdvisor.InitialImageAdvice advice)
+ {
+ // assumption: required roles are interesting to GII only if Reinitialize...
+// if (true)
+ return false;
+// if (getMembershipAttributes().hasRequiredRoles()
+// && getMembershipAttributes().getResumptionAction().isReinitialize()) {
+// // are any required roles missing for GII with Reinitialize?
+// Set missingRR = new HashSet(getMembershipAttributes().getRequiredRoles());
+// missingRR.removeAll(getSystem().getDistributedMember().getRoles());
+// for (Iterator iter = advice.replicates.iterator(); iter.hasNext();) {
+// DistributedMember member = (DistributedMember)iter.next();
+// missingRR.removeAll(member.getRoles());
+// }
+// for (Iterator iter = advice.others.iterator(); iter.hasNext();) {
+// DistributedMember member = (DistributedMember)iter.next();
+// missingRR.removeAll(member.getRoles());
+// }
+// for (Iterator iter = advice.preloaded.iterator(); iter.hasNext();) {
+// DistributedMember member = (DistributedMember)iter.next();
+// missingRR.removeAll(member.getRoles());
+// }
+// if (!missingRR.isEmpty()) {
+// // entering immediate loss condition, which will cause reinit on resume
+// this.giiMissingRequiredRoles = true;
+// }
+// }
+// return this.giiMissingRequiredRoles;
+ }
+
+ private void getInitialImageAndRecovery(InputStream snapshotInputStream,
+ InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs,
+ boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException
+ {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_0, this.getName()));
+
+ ImageState imgState = getImageState();
+ imgState.init();
+ boolean targetRecreated = internalRegionArgs.getRecreateFlag();
+ Boolean isCBool = (Boolean)isConversion.get();
+ boolean isForConversion = isCBool!=null?isCBool.booleanValue():false;
+
+ if (recoverFromDisk && snapshotInputStream != null && !isForConversion) {
+ throw new InternalGemFireError(LocalizedStrings.DistributedRegion_IF_LOADING_A_SNAPSHOT_THEN_SHOULD_NOT_BE_RECOVERING_ISRECOVERING_0_SNAPSHOTSTREAM_1.toLocalizedString(new Object[] {Boolean.valueOf(recoverFromDisk), snapshotInputStream}));
+ }
+
+ ProfileExchangeProcessor targetProvider;
+ if (dataPolicy.withPersistence()) {
+ targetProvider = new CreatePersistentRegionProcessor(this,
+ getPersistenceAdvisor(), recoverFromDisk);
+ }
+ else {
+ // this will go in the advisor profile
+ targetProvider = new CreateRegionProcessor(this);
+ }
+ imgState.setInRecovery(false);
+ RegionVersionVector recovered_rvv = null;
+ if (dataPolicy.withPersistence()) {
+ recovered_rvv = (this.getVersionVector()==null?null:this.getVersionVector().getCloneForTransmission());
+ }
+ // initializeRegion will send out our profile
+ targetProvider.initializeRegion();
+
+ if(persistenceAdvisor != null) {
+ persistenceAdvisor.initialize();
+ }
+
+ // Register listener here so that the remote members are known
+ // since registering calls initializeCriticalMembers (which needs to know about
+ // remote members
+ if (!isInternalRegion()) {
+ if (!this.isDestroyed) {
+ cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
+ }
+ }
+
+ releaseBeforeGetInitialImageLatch();
+
+ // allow GII to invoke test hooks. Do this just after releasing the
+ // before-gii latch for bug #48962. See ConcurrentLeaveDuringGIIDUnitTest
+ InitialImageOperation.beforeGetInitialImage(this);
+
+ if (snapshotInputStream != null) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("DistributedRegion.getInitialImageAndRecovery: About to load snapshot, isInitialized={}; {}",
+ isInitialized(), getFullPath());
+ }
+ loadSnapshotDuringInitialization(snapshotInputStream);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e); // @todo change this exception?
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException(e); // @todo change this exception?
+ }
+ cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
+ return;
+ }
+
+ // No snapshot provided, use the imageTarget(s)
+
+ // if we were given a recommended imageTarget, use that first, and
+ // treat it like it is a replicate (regardless of whether it actually is
+ // or not)
+
+ InitialImageOperation iiop = new InitialImageOperation(this, this.entries);
+ // [defunct] Special case GII for PR admin regions (which are always
+ // replicates and always writers
+ // bruce: this was commented out after adding the GIIAckRequest logic to
+ // force
+ // consistency before the gii operation begins
+ // if (isUsedForPartitionedRegionAdmin() ||
+ // isUsedForPartitionedRegionBucket()) {
+ // releaseBeforeGetInitialImageLatch();
+ // iiop.getFromAll(this.distAdvisor.adviseGeneric(), false);
+ // cleanUpDestroyedTokens();
+ // return;
+ // }
+
+
+ CacheDistributionAdvisor.InitialImageAdvice advice = null;
+ boolean done = false;
+ while(!done && !isDestroyed()) {
+ advice = targetProvider.getInitialImageAdvice(advice);
+ checkInitialImageForReliability(imageSrc, advice);
+ boolean attemptGetFromOne =
+ imageSrc != null // we were given a specific member
+ || this.dataPolicy.withPreloaded()
+ && !advice.preloaded.isEmpty() // this is a preloaded region
+ || (!advice.replicates.isEmpty());
+ // That is: if we have 0 or 1 giiProvider then we can do a getFromOne gii;
+ // if we have 2 or more giiProviders then we must do a getFromAll gii.
+
+ if (attemptGetFromOne) {
+ if (recoverFromDisk) {
+ if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
+ CacheObserverHolder.getInstance().afterMarkingGIIStarted();
+ }
+ }
+ {
+ // If we have an imageSrc and the target is reinitializing mark the
+ // getInitialImage so that it will wait until the target region is fully initialized
+ // before responding to the get image request. Otherwise, the
+ // source may respond with no data because it is still initializing,
+ // e.g. loading a snapshot.
+
+ // Plan A: use specified imageSrc, if specified
+ if (imageSrc != null) {
+ try {
+ GIIStatus ret = iiop.getFromOne(Collections.singleton(imageSrc),
+ targetRecreated, advice, recoverFromDisk, recovered_rvv);
+ if (GIIStatus.didGII(ret)) {
+ this.giiMissingRequiredRoles = false;
+ cleanUpDestroyedTokensAndMarkGIIComplete(ret);
+ done = true;
+ return;
+ }
+ } finally {
+ imageSrc = null;
+ }
+ }
+
+ // Plan C: use a replicate, if one exists
+ GIIStatus ret = iiop.getFromOne(advice.replicates, false, advice, recoverFromDisk, recovered_rvv);
+ if (GIIStatus.didGII(ret)) {
+ cleanUpDestroyedTokensAndMarkGIIComplete(ret);
+ done = true;
+ return;
+ }
+
+ // Plan D: if this is a PRELOADED region, fetch from another PRELOADED
+ if (this.dataPolicy.isPreloaded()) {
+ GIIStatus ret_preload = iiop.getFromOne(advice.preloaded, false, advice, recoverFromDisk, recovered_rvv);
+ if (GIIStatus.didGII(ret_preload)) {
+ cleanUpDestroyedTokensAndMarkGIIComplete(ret_preload);
+ done = true;
+ return;
+ }
+ } // isPreloaded
+ }
+
+ //If we got to this point, we failed in the GII. Cleanup
+ //any partial image we received
+ cleanUpAfterFailedGII(recoverFromDisk);
+
+ } // attemptGetFromOne
+ else {
+ if(!isDestroyed()) {
+ if(recoverFromDisk) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZED_FROM_DISK,
+ new Object[] {this.getFullPath(), persistentId, getPersistentID()}));
+ if(persistentId != null) {
+ RegionLogger.logRecovery(this.getFullPath(), persistentId,
+ getDistributionManager().getDistributionManagerId());
+ }
+ } else {
+ RegionLogger.logCreate(this.getFullPath(),
+ getDistributionManager().getDistributionManagerId());
+
+ if (getPersistentID() != null) {
+ RegionLogger.logPersistence(this.getFullPath(),
+ getDistributionManager().getDistributionManagerId(),
+ getPersistentID());
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_NEW_PERSISTENT_REGION_CREATED,
+ new Object[] {this.getFullPath(), getPersistentID()}));
+ }
+ }
+
+ /* no more union GII
+ // do union getInitialImage
+ Set rest = new HashSet();
+ rest.addAll(advice.others);
+ rest.addAll(advice.preloaded);
+ // push profile w/ recovery flag turned off at same time that we
+ // do a union getInitialImage
+ boolean pushProfile = recoverFromDisk;
+ iiop.getFromAll(rest, pushProfile);
+ */
+ cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
+ done = true;
+ return;
+ }
+ break;
+ }
+ }
+
+ return;
+ }
+
+ private void synchronizeWith(InternalDistributedMember target,
+ VersionSource idToRecover) {
+ InitialImageOperation op = new InitialImageOperation(this, this.entries);
+ op.synchronizeWith(target, idToRecover, null);
+ }
+
+ /**
+ * If this region has concurrency controls enabled this will pull any missing
+ * changes from other replicates using InitialImageOperation and a filtered
+ * chunking protocol.
+ */
+ public void synchronizeForLostMember(InternalDistributedMember
+ lostMember, VersionSource lostVersionID) {
+ if (this.concurrencyChecksEnabled == false) {
+ return;
+ }
+ CacheDistributionAdvisor advisor = getCacheDistributionAdvisor();
+ Set<InternalDistributedMember> targets = advisor.adviseInitializedReplicates();
+ for (InternalDistributedMember target: targets) {
+ synchronizeWith(target, lostVersionID, lostMember);
+ }
+ }
+
+ /**
+ * synchronize with another member wrt messages from the given "lost" member.
+ * This can be used when a primary bucket crashes to ensure that interrupted
+ * message distribution is mended.
+ */
+ private void synchronizeWith(InternalDistributedMember target,
+ VersionSource versionMember, InternalDistributedMember lostMember) {
+ InitialImageOperation op = new InitialImageOperation(this, this.entries);
+ op.synchronizeWith(target, versionMember, lostMember);
+ }
+
+ /**
+ * invoked just before an initial image is requested from another member
+ */
+ /** remove any partial entries received in a failed GII */
+ protected void cleanUpAfterFailedGII(boolean recoverFromDisk) {
+ DiskRegion dskRgn = getDiskRegion();
+ //if we have a persistent region, instead of deleting everything on disk,
+ //we will just reset the "recovered from disk" flag. After
+ //the next GII we will delete these entries if they do not come
+ //in as part of the GII.
+ if (recoverFromDisk && dskRgn != null && dskRgn.isBackup()) {
+ dskRgn.resetRecoveredEntries(this);
+ return;
+ }
+
+ if (!this.entries.isEmpty()) {
+ closeEntries();
+ if (getDiskRegion() != null) {
+ getDiskRegion().clear(this, null);
+ }
+ // clear the left-members and version-tags sets in imageState
+ getImageState().getLeftMembers();
+ getImageState().getVersionTags();
+ // Clear OQL indexes
+ if (this.indexManager != null) {
+ try {
+ this.indexManager.rerunIndexCreationQuery();
+ } catch (Exception ex){
+ if (logger.isDebugEnabled()) {
+ logger.debug("Exception while clearing indexes after GII failure.", ex);
+ }
+ }
+ }
+ }
+ }
+
+ private void initMembershipRoles()
+ {
+ synchronized (this.advisorListener) {
+ // hold sync to prevent listener from changing initial members
+ Set others = this.distAdvisor
+ .addMembershipListenerAndAdviseGeneric(this.advisorListener);
+ this.advisorListener.addMembers(others);
+ // initialize missing required roles with initial member info
+ if (getMembershipAttributes().hasRequiredRoles()) {
+ // AdvisorListener will also sync on missingRequiredRoles
+ synchronized (this.missingRequiredRoles) {
+ this.missingRequiredRoles.addAll(getMembershipAttributes()
+ .getRequiredRoles());
+ // remove all the roles we are playing since they will never be
+ // missing
+ this.missingRequiredRoles.removeAll(getSystem()
+ .getDistributedMember().getRoles());
+ for (Iterator iter = others.iterator(); iter.hasNext();) {
+ DistributedMember other = (DistributedMember)iter.next();
+ this.missingRequiredRoles.removeAll(other.getRoles());
+ }
+ }
+ }
+ }
+ if (getMembershipAttributes().hasRequiredRoles()) {
+ // wait up to memberTimeout for required roles...
+// boolean requiredRolesAreMissing = false;
+ int memberTimeout = getSystem().getConfig().getMemberTimeout();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting up to {} for required roles.", memberTimeout);
+ }
+ try {
+ if (this.giiMissingRequiredRoles) {
+ // force reliability loss and possibly resumption
+ isInitializingThread = true;
+ synchronized (this.advisorListener) {
+ synchronized (this.missingRequiredRoles) {
+ // forcing state of loss because of bad GII
+ this.isMissingRequiredRoles = true;
+ getCachePerfStats().incReliableRegionsMissing(1);
+ if (getMembershipAttributes().getLossAction().isAllAccess())
+ getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
+ else if (getMembershipAttributes().getLossAction()
+ .isLimitedAccess())
+ getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
+ else if (getMembershipAttributes().getLossAction().isNoAccess())
+ getCachePerfStats().incReliableRegionsMissingNoAccess(1);
+ // pur code to increment the stats.
+ if (logger.isDebugEnabled()) {
+ logger.debug("GetInitialImage had missing required roles.");
+ }
+ // TODO: will this work with RECONNECT and REINITIALIZE?
+ isInitializingThread = true;
+ lostReliability(null, null);
+ if (this.missingRequiredRoles.isEmpty()) {
+ // all required roles are present so force resumption
+ this.isMissingRequiredRoles = false;
+ getCachePerfStats().incReliableRegionsMissing(-1);
+ if (getMembershipAttributes().getLossAction().isAllAccess())
+ getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
+ else if (getMembershipAttributes().getLossAction()
+ .isLimitedAccess())
+ getCachePerfStats()
+ .incReliableRegionsMissingLimitedAccess(-1);
+ else if (getMembershipAttributes().getLossAction().isNoAccess())
+ getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
+ // pur code to increment the stats.
+ boolean async = resumeReliability(null, null);
+ if (async) {
+ advisorListener.destroyed = true;
+ }
+ }
+ }
+ }
+ }
+ else {
+ if (!getSystem().isLoner()) {
+ waitForRequiredRoles(memberTimeout);
+ }
+ synchronized (this.advisorListener) {
+ synchronized (this.missingRequiredRoles) {
+ if (this.missingRequiredRoles.isEmpty()) {
+ Assert.assertTrue(!this.isMissingRequiredRoles);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Initialization completed with all required roles present.");
+ }
+ }
+ else {
+ // starting in state of loss...
+ this.isMissingRequiredRoles = true;
+ getCachePerfStats().incReliableRegionsMissing(1);
+ if (getMembershipAttributes().getLossAction().isAllAccess())
+ getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
+ else if (getMembershipAttributes().getLossAction()
+ .isLimitedAccess())
+ getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
+ else if (getMembershipAttributes().getLossAction().isNoAccess())
+ getCachePerfStats().incReliableRegionsMissingNoAccess(1);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Initialization completed with missing required roles: {}", this.missingRequiredRoles);
+ }
+ isInitializingThread = true;
+ lostReliability(null, null);
+ }
+ }
+ }
+ }
+ }
+ catch (RegionDestroyedException ignore) {
+ // ignore to fix bug 34639 may be thrown by waitForRequiredRoles
+ }
+ catch (CancelException ignore) {
+ // ignore to fix bug 34639 may be thrown by waitForRequiredRoles
+ if (isInitializingThread) {
+ throw ignore;
+ }
+ }
+ catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+ }
+
+ }
+ // open latch which will allow any threads in lostReliability to proceed
+ this.initializationLatchAfterMemberTimeout.countDown();
+ }
+ private boolean isRecoveryNeeded() {
+ return getDataPolicy().withPersistence()
+ && getDiskRegion().isRecreated();
+ }
+
+ // called by InitialImageOperation to clean up destroyed tokens
+ // release afterGetInitialImageInitializationLatch before unlocking
+ // cleanUpLock
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK")
+ private void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus)
+ {
+ //We need to clean up the disk before we release the after get initial image latch
+ DiskRegion dskRgn = getDiskRegion();
+ if (dskRgn != null && dskRgn.isBackup()) {
+ dskRgn.finishInitializeOwner(this, giiStatus);
+ }
+ ImageState is = getImageState();
+ is.lockGII();
+ // clear the version tag and left-members sets
+ is.getVersionTags();
+ is.getLeftMembers();
+ // remove DESTROYED tokens
+ RegionVersionVector rvv = is.getClearRegionVersionVector();
+ try {
+ Iterator/*<Object>*/ keysIt = getImageState().getDestroyedEntries();
+ while (keysIt.hasNext()) {
+ this.entries.removeIfDestroyed(keysIt.next());
+ }
+ if (rvv != null) {
+ // clear any entries received in the GII that are older than the RVV versions.
+ // this can happen if entry chunks were received prior to the clear() being
+ // processed
+ clearEntries(rvv);
+ }
+ //need to do this before we release the afterGetInitialImageLatch
+ if(persistenceAdvisor != null) {
+ persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
+ }
+ }
+ finally {
+ // release after gii lock first so basicDestroy will see isInitialized()
+ // be true
+ // when they get the cleanUp lock.
+ try {
+ releaseAfterGetInitialImageLatch();
+ } finally { // make sure unlockGII is done for bug 40001
+ is.unlockGII();
+ }
+ }
+
+ if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
+ CacheObserverHolder.getInstance().afterMarkingGIICompleted();
+ }
+
+ //"Initializing region {0}" which is not acompanied by a completed message. Users think thread is stuck in some operation. Hence adding this log
+ logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_COMPLETED_0, this.getName()));
+ }
+
+ /**
+ * @see LocalRegion#basicDestroy(EntryEventImpl, boolean, Object)
+ */
+ @Override
+ protected
+ void basicDestroy(EntryEventImpl event,
+ boolean cacheWrite,
+ Object expectedOldValue)
+ throws EntryNotFoundException, CacheWriterException, TimeoutException {
+ // disallow local destruction for mirrored keysvalues regions
+ boolean invokeWriter = cacheWrite;
+ boolean hasSeen = false;
+ if (hasSeenEvent(event)) {
+ hasSeen = true;
+ }
+ checkIfReplicatedAndLocalDestroy(event);
+
+ try {
+ if (this.requiresOneHopForMissingEntry(event)) {
+ // bug #45704: see if a one-hop must be done for this operation
+ RegionEntry re = getRegionEntry(event.getKey());
+ if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
+ if (this.srp == null) {
+ // only assert for non-client regions.
+ Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
+ }
+ if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
+ // removeAll will send a single one-hop for empty regions. for other missing entries
+ // we need to get a valid version number before modifying the local cache
+ // TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
+ boolean didDistribute = RemoteDestroyMessage.distribute(event, expectedOldValue, !this.generateVersionTag);
+
+ if (!this.generateVersionTag && !didDistribute) {
+ throw new PersistentReplicatesOfflineException();
+ }
+
+ if (didDistribute) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Event after remoteDestroy operation: {}", event);
+ }
+ invokeWriter = false; // remote cache invoked the writer
+ if (event.getVersionTag() == null) {
+ // if the event wasn't applied by the one-hop replicate it will not have a version tag
+ // and so should not be applied to this cache
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ super.basicDestroy(event, invokeWriter, expectedOldValue);
+
+ // if this is a destroy coming in from remote source, free up lock resources
+ // if this is a local origin destroy, this will happen after lock is
+ // released
+ if (this.scope.isGlobal() && event.isOriginRemote()) {
+ try {
+ getLockService().freeResources(event.getKey());
+ }
+ catch (LockServiceDestroyedException ignore) {
+ }
+ }
+
+ return;
+ }
+ finally {
+ if (hasSeen) {
+ if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
+ event.getRemoveAllOperation().addEntry(event, true);
+ }
- distributeDestroy(event, expectedOldValue);
- event.invokeCallbacks(this,true, false);
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ distributeDestroy(event, expectedOldValue);
++ event.invokeCallbacks(this,true, false);
++ }
+ }
+ }
+ }
+
+ @Override
+ void basicDestroyPart3(RegionEntry re, EntryEventImpl event,
+ boolean inTokenMode, boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
+
+ distributeDestroy(event, expectedOldValue);
+ super.basicDestroyPart3(re, event, inTokenMode, duringRI, invokeCallbacks, expectedOldValue);
+ }
+
+ void distributeDestroy(EntryEventImpl event, Object expectedOldValue) {
+ if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
+ boolean distribute = !event.getInhibitDistribution();
+ if (distribute) {
+ DestroyOperation op = new DestroyOperation(event);
+ op.distribute();
+ }
+ }
+ }
+
+ @Override
+ boolean evictDestroy(LRUEntry entry) {
+ boolean evictDestroyWasDone = super.evictDestroy(entry);
+ if (evictDestroyWasDone) {
+ if (this.scope.isGlobal()) {
+ try {
+ getLockService().freeResources(entry.getKey());
+ }
+ catch (LockServiceDestroyedException ignore) {
+ }
+ }
+ }
+ return evictDestroyWasDone;
+ }
+
+
+ /**
+ * @see LocalRegion#basicInvalidateRegion(RegionEventImpl)
+ */
+ @Override
+ void basicInvalidateRegion(RegionEventImpl event)
+ {
+ // disallow local invalidation for replicated regions
+ if (!event.isDistributed() && getScope().isDistributed()
+ && getDataPolicy().withReplication()) {
+ throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
+ }
+ if (shouldDistributeInvalidateRegion(event)) {
+ distributeInvalidateRegion(event);
+ }
+ super.basicInvalidateRegion(event);
+ }
+
+ /**
+ * decide if InvalidateRegionOperation should be sent to peers. broken out so
+ * that BucketRegion can override
+ * @param event
+ * @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise
+ */
+ protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
+ return event.isDistributed() && !event.isOriginRemote();
+ }
+
+ /**
+ * Distribute the invalidate of a region given its event.
+ * This implementation sends the invalidate to peers.
+ * @since 5.7
+ */
+ protected void distributeInvalidateRegion(RegionEventImpl event) {
+ new InvalidateRegionOperation(event).distribute();
+ }
+
+ /**
+ * @see LocalRegion#basicDestroyRegion(RegionEventImpl, boolean, boolean,
+ * boolean)
+ */
+ @Override
+ void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite,
+ boolean lock, boolean callbackEvents) throws CacheWriterException,
+ TimeoutException
+ {
+ final String path = getFullPath();
+ //Keep track of regions that are being destroyed. This helps avoid a race
+ //when another member concurrently creates this region. See bug 42051.
+ boolean isClose = event.getOperation().isClose();
+ if(!isClose) {
+ cache.beginDestroy(path, this);
+ }
+ try {
+ super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents);
+ // send destroy region operation even if this is a localDestroyRegion (or
+ // close)
+ if (!event.isOriginRemote()) {
+ distributeDestroyRegion(event, true);
+ } else {
+ if(!event.isReinitializing()) {
+ RegionEventImpl localEvent = new RegionEventImpl(this,
+ Operation.REGION_LOCAL_DESTROY, event.getCallbackArgument(), false, getMyId(),
+ generateEventID()/* generate EventID */);
+ distributeDestroyRegion(localEvent, false/*fixes bug 41111*/);
+ }
+ }
+ notifyBridgeClients(event);
+ }
+ catch (CancelException e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("basicDestroyRegion short-circuited due to cancellation");
+ }
+ }
+ finally {
+ if(!isClose) {
+ cache.endDestroy(path, this);
+ }
+ RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose);
+ }
+ }
+
+
+ @Override
+ protected void distributeDestroyRegion(RegionEventImpl event,
+ boolean notifyOfRegionDeparture) {
+ if(persistenceAdvisor != null) {
+ persistenceAdvisor.releaseTieLock();
+ }
+ new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
+ }
+
+ /**
+ * Return true if invalidation occurred; false if it did not, for example if
+ * it was already invalidated
+ *
+ * @see LocalRegion#basicInvalidate(EntryEventImpl)
+ */
+ @Override
+ void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException
+ {
+
+ boolean hasSeen = false;
+ if (hasSeenEvent(event)) {
+ hasSeen = true;
+ }
+ try {
+ // disallow local invalidation for replicated regions
+ if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed()
+ && getDataPolicy().withReplication()) {
+ throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
+ }
+ if (this.requiresOneHopForMissingEntry(event)) {
+ // bug #45704: see if a one-hop must be done for this operation
+ RegionEntry re = getRegionEntry(event.getKey());
+ if (re == null/* || re.isTombstone()*/ || !this.generateVersionTag) {
+ if (this.srp == null) {
+ // only assert for non-client regions.
+ Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
+ }
+ // TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
+ boolean didDistribute = RemoteInvalidateMessage.distribute(event, !this.generateVersionTag);
+ if (!this.generateVersionTag && !didDistribute) {
+ throw new PersistentReplicatesOfflineException();
+ }
+ if (didDistribute) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Event after remoteInvalidate operation: {}", event);
+ }
+ if (event.getVersionTag() == null) {
+ // if the event wasn't applied by the one-hop replicate it will not have a version tag
+ // and so should not be applied to this cache
+ return;
+ }
+ }
+ }
+ }
+
+ super.basicInvalidate(event);
+
+ return;
+ } finally {
+ if (hasSeen) {
- distributeInvalidate(event);
- event.invokeCallbacks(this,true, false);
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ distributeInvalidate(event);
++ event.invokeCallbacks(this,true, false);
++ }
+ }
+ }
+ }
+
+ @Override
+ void basicInvalidatePart3(RegionEntry re, EntryEventImpl event,
+ boolean invokeCallbacks) {
+ distributeInvalidate(event);
+ super.basicInvalidatePart3(re, event, invokeCallbacks);
+ }
+
+ void distributeInvalidate(EntryEventImpl event) {
+ if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
+ && !isTX() /* only distribute if non-tx */) {
+ if (event.isDistributed() && !event.isOriginRemote()) {
+ boolean distribute = !event.getInhibitDistribution();
+ if (distribute) {
+ InvalidateOperation op = new InvalidateOperation(event);
+ op.distribute();
+ }
+ }
+ }
+ }
+
+
+ @Override
+ void basicUpdateEntryVersion(EntryEventImpl event)
+ throws EntryNotFoundException {
+
+ try {
+ if (!hasSeenEvent(event)) {
+ super.basicUpdateEntryVersion(event);
+ }
+ return;
+ } finally {
- distributeUpdateEntryVersion(event);
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ distributeUpdateEntryVersion(event);
++ }
+ }
+ }
+
+ private void distributeUpdateEntryVersion(EntryEventImpl event) {
+ if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
+ && !isTX() /* only distribute if non-tx */) {
+ if (event.isDistributed() && !event.isOriginRemote()) {
+ UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
+ op.distribute();
+ }
+ }
+ }
+
+ @Override
+ protected void basicClear(RegionEventImpl ev)
+ {
+ Lock dlock = this.getRegionDistributedLockIfGlobal();
+ try {
+ super.basicClear(ev);
+ }
+ finally {
+ if (dlock != null)
+ dlock.unlock();
+ }
+ }
+
+ @Override
+ void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+ if (this.concurrencyChecksEnabled && !this.dataPolicy.withReplication()) {
+ boolean retry = false;
+ do {
+ // non-replicate regions must defer to a replicate for clear/invalidate of region
+ Set<InternalDistributedMember> repls = this.distAdvisor.adviseReplicates();
+ if (repls.size() > 0) {
+ InternalDistributedMember mbr = repls.iterator().next();
+ RemoteRegionOperation op = RemoteRegionOperation.clear(mbr, this);
+ try {
+ op.distribute();
+ return;
+ } catch (CancelException e) {
+ this.stopper.checkCancelInProgress(e);
+ retry = true;
+ } catch (RemoteOperationException e) {
+ this.stopper.checkCancelInProgress(e);
+ retry = true;
+ }
+ }
+ } while (retry);
+ }
+ // if no version vector or if no replicates are around, use the default mechanism
+ super.basicClear(regionEvent, cacheWrite);
+ }
+
+
+ @Override
+ void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+ boolean enableRVV = useRVV && this.dataPolicy.withReplication() && this.concurrencyChecksEnabled && !getDistributionManager().isLoner();
+
+ //Fix for 46338 - apparently multiple threads from the same VM are allowed
+ //to suspend locking, which is what distributedLockForClear() does. We don't
+ //want that to happen, so we'll synchronize to make sure only one thread on
+ //this member performs a clear.
+ synchronized(clearLock) {
+ if (enableRVV) {
+
+ distributedLockForClear();
+ try {
+ Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
+ // pause all generation of versions and flush from the other members to this one
+ try {
+ obtainWriteLocksForClear(regionEvent, participants);
+ clearRegionLocally(regionEvent, cacheWrite, null);
+ if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
+ DistributedClearOperation.clear(regionEvent, null, participants);
+ }
+ } finally {
+ releaseWriteLocksForClear(regionEvent, participants);
+ }
+ }
+ finally {
+ distributedUnlockForClear();
+ }
+ } else {
+ Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
+ clearRegionLocally(regionEvent, cacheWrite, null);
+ if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
+ DistributedClearOperation.clear(regionEvent, null, participants);
+ }
+ }
+ }
+
+ // since clients do not maintain RVVs except for tombstone GC
+ // we need to ensure that current ops
<TRUNCATED>