You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:43:21 UTC

[033/100] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 92b585a,0000000..f3e730a
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@@ -1,4303 -1,0 +1,4311 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.gemstone.gemfire.internal.cache;
 +
 +import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
 +
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.CopyOnWriteArraySet;
 +import java.util.concurrent.RejectedExecutionException;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.locks.Condition;
 +import java.util.concurrent.locks.Lock;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.CancelException;
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.InvalidDeltaException;
 +import com.gemstone.gemfire.SystemFailure;
 +import com.gemstone.gemfire.cache.CacheClosedException;
 +import com.gemstone.gemfire.cache.CacheListener;
 +import com.gemstone.gemfire.cache.CacheLoader;
 +import com.gemstone.gemfire.cache.CacheLoaderException;
 +import com.gemstone.gemfire.cache.CacheWriter;
 +import com.gemstone.gemfire.cache.CacheWriterException;
 +import com.gemstone.gemfire.cache.DataPolicy;
 +import com.gemstone.gemfire.cache.DiskAccessException;
 +import com.gemstone.gemfire.cache.EntryNotFoundException;
 +import com.gemstone.gemfire.cache.LossAction;
 +import com.gemstone.gemfire.cache.MembershipAttributes;
 +import com.gemstone.gemfire.cache.Operation;
 +import com.gemstone.gemfire.cache.RegionAccessException;
 +import com.gemstone.gemfire.cache.RegionAttributes;
 +import com.gemstone.gemfire.cache.RegionDestroyedException;
 +import com.gemstone.gemfire.cache.RegionDistributionException;
 +import com.gemstone.gemfire.cache.RegionMembershipListener;
 +import com.gemstone.gemfire.cache.ResumptionAction;
 +import com.gemstone.gemfire.cache.RoleException;
 +import com.gemstone.gemfire.cache.TimeoutException;
 +import com.gemstone.gemfire.cache.TransactionId;
 +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 +import com.gemstone.gemfire.cache.execute.Function;
 +import com.gemstone.gemfire.cache.execute.FunctionException;
 +import com.gemstone.gemfire.cache.execute.ResultCollector;
 +import com.gemstone.gemfire.cache.execute.ResultSender;
 +import com.gemstone.gemfire.cache.persistence.PersistentReplicatesOfflineException;
 +import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
 +import com.gemstone.gemfire.cache.wan.GatewaySender;
 +import com.gemstone.gemfire.distributed.DistributedLockService;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
 +import com.gemstone.gemfire.distributed.Role;
 +import com.gemstone.gemfire.distributed.internal.DM;
 +import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
 +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
 +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
 +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.ProfileVisitor;
 +import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 +import com.gemstone.gemfire.distributed.internal.MembershipListener;
 +import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
 +import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
 +import com.gemstone.gemfire.distributed.internal.locks.DLockService;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.internal.Assert;
 +import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
 +import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
 +import com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
 +import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
 +import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
 +import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
 +import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
 +import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultWaiter;
 +import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
 +import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
 +import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
 +import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
 +import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
 +import com.gemstone.gemfire.internal.cache.persistence.CreatePersistentRegionProcessor;
 +import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisor;
 +import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisorImpl;
 +import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
 +import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberManager;
 +import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberView;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
 +import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
 +import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 +import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 +import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 +import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
 +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
 +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.offheap.Chunk;
 +import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 +import com.gemstone.gemfire.internal.offheap.annotations.Released;
 +import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 +import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
 +import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
 +import com.gemstone.gemfire.i18n.StringId;
 +/**
 + * 
 + * @author Eric Zoerner
 + * @author Sudhir Menon
 + */
 +@SuppressWarnings("deprecation")
 +public class DistributedRegion extends LocalRegion implements 
 +    CacheDistributionAdvisee
 +{
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  /** causes cache profile to be added to afterRemoteRegionCreate notification for testing */
 +  public static boolean TEST_HOOK_ADD_PROFILE = false;
 +  
 +  /** Used to sync accesses to this.dlockService to allow lazy construction */
 +  private final Object dlockMonitor = new Object();
 +
 +  final CacheDistributionAdvisor distAdvisor;
 +
 +  /**
 +   * @guarded.By {@link #dlockMonitor}
 +   */
 +  private DistributedLockService dlockService;
 +
 +  protected final AdvisorListener advisorListener = new AdvisorListener();
 +
 +  /** Set of currently missing required roles */
 +  protected final HashSet missingRequiredRoles = new HashSet();
 +
 +  /** True if this region is currently missing any required roles */
 +  protected volatile boolean isMissingRequiredRoles = false;
 +  
 +  /**
 +   * True if this region is has any required roles defined and the LossAction is
 +   * either NO_ACCESS or LIMITED_ACCESS. Reliability checks will only happen if
 +   * this is true.
 +   */
 +  private final boolean requiresReliabilityCheck;
 +
 +  /**
 +   * Provides a queue for reliable message delivery
 +   * 
 +   * @since 5.0
 +   */
 +  protected final ReliableMessageQueue rmq;
 +
 +  /**
 +   * Latch that is opened after initialization waits for required roles up to
 +   * the <a href="DistributedSystem#member-timeout">member-timeout </a>.
 +   */
 +  protected final StoppableCountDownLatch initializationLatchAfterMemberTimeout; 
 +
 +  private final PersistenceAdvisor persistenceAdvisor;
 +  
 +  private final PersistentMemberID persistentId;
 +
 +  /**
 +   * This boolean is set to false when this region
 +   * is non-persistent, but there are persistent members in the distributed system
 +   * to which all region modifications should be forwarded
 +   * see bug 45186
 +   */
 +  private volatile boolean generateVersionTag = true;
 +
 +  /** Tests can set this to true and ignore reliability triggered reconnects */
 +  public static boolean ignoreReconnect = false;
 +  
 +  /**
 +   * Lock to prevent multiple threads on this member from performing
 +   * a clear at the same time.
 +   */
 +  private final Object clearLock = new Object();
 +
 +  private static AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
 +
 +  /** Creates a new instance of DistributedRegion */
 +  protected DistributedRegion(String regionName, RegionAttributes attrs,
 +      LocalRegion parentRegion, GemFireCacheImpl cache,
 +      InternalRegionArguments internalRegionArgs) {
 +    super(regionName, attrs, parentRegion, cache, internalRegionArgs);
 +    this.initializationLatchAfterMemberTimeout = new StoppableCountDownLatch(
 +        getCancelCriterion(), 1);
 +    this.distAdvisor = createDistributionAdvisor(internalRegionArgs);
 +    
 +    if (getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
 +        && !isInternalRegion() && !attrs.getScope().isAck() && !doesNotDistribute() && attrs.getDataPolicy().withStorage()) {
 +      logger.warn(LocalizedMessage.create(LocalizedStrings.DistributedRegion_REGION_0_1_SPLITBRAIN_CONFIG_WARNING,
 +          new Object[] { regionName, attrs.getScope() })); 
 +    }
 +    if (!getDistributionManager().getConfig().getEnableNetworkPartitionDetection() 
 +        && attrs.getDataPolicy().withPersistence() && !loggedNetworkPartitionWarning.getAndSet(true)) {
 +      logger.warn(LocalizedMessage.create(
 +          LocalizedStrings.DistributedRegion_REGION_0_ENABLE_NETWORK_PARTITION_WARNING,
 +          new Object[] { regionName, attrs.getScope() }));
 +    }
 +
 +    boolean setRequiresReliabilityCheck = attrs.getMembershipAttributes()
 +        .hasRequiredRoles()
 +        &&
 +        // note that the following includes NO_ACCESS, LIMITED_ACCESS,
 +        !attrs.getMembershipAttributes().getLossAction().isAllAccess()
 +        && !attrs.getMembershipAttributes().getLossAction().isReconnect();
 +
 +    // this optimization is safe for as long as Roles and Required Roles are
 +    // immutable
 +    // if this VM fulfills all required roles, make requiresReliabilityCheck
 +    // false
 +    Set reqRoles = new HashSet(attrs.getMembershipAttributes()
 +        .getRequiredRoles());
 +    reqRoles.removeAll(getSystem().getDistributedMember().getRoles());
 +    if (reqRoles.isEmpty()) {
 +      setRequiresReliabilityCheck = false;
 +    }
 +
 +    this.requiresReliabilityCheck = setRequiresReliabilityCheck;
 +
 +    {
 +      ReliableMessageQueue tmp = null;
 +      if (this.requiresReliabilityCheck) {
 +        //         if
 +        // (attrs.getMembershipAttributes().getLossAction().isAllAccessWithQueuing())
 +        // {
 +        //           tmp = cache.getReliableMessageQueueFactory().create(this);
 +        //         }
 +      }
 +      this.rmq = tmp;
 +    }
 +
 +    if(internalRegionArgs.isUsedForPartitionedRegionBucket()) {
 +      this.persistenceAdvisor = internalRegionArgs.getPersistenceAdvisor();
 +    } else if (this.allowsPersistence()){
 +      //TODO prpersist - using this lock service is a hack. Maybe? Or maybe
 +      //it's ok if we have one (rarely used) lock service for many operations?
 +      //What does the resource manager do?
 +      DistributedLockService dl = cache.getPartitionedRegionLockService();
 +      try {
 +        //TODO prpersist - this is just a quick and dirty storage mechanism so that
 +        //I can test the storage.
 +        DiskRegionStats diskStats;
 +        PersistentMemberView storage;
 +        if(getDataPolicy().withPersistence()) {
 +          storage = getDiskRegion();
 +          diskStats = getDiskRegion().getStats();
 +        } else {
 +          storage = new InMemoryPersistentMemberView();
 +          diskStats = null;
 +        }
 +        PersistentMemberManager memberManager = cache.getPersistentMemberManager();
 +        this.persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage, this.getFullPath(), diskStats, memberManager);
 +      } catch (Exception e) {
 +        throw new InternalGemFireError("Couldn't recover persistence");
 +      }
 +    } else {
 +      this.persistenceAdvisor = null;
 +    }
 +    if(this.persistenceAdvisor != null) {
 +      this.persistentId = persistenceAdvisor.generatePersistentID();
 +    } else {
 +      this.persistentId = null;
 +    }
 +    
 +  }
 +  
 +  @Override
 +  public void createEventTracker() {
 +    this.eventTracker = new EventTracker(this);
 +    this.eventTracker.start();
 +  }
 +  
 +  /**
 +   * Intended for used during construction of a DistributedRegion
 +   *  
 +   * @return the advisor to be used by the region
 +   */
 +  protected CacheDistributionAdvisor createDistributionAdvisor(InternalRegionArguments internalRegionArgs) {
 +    return CacheDistributionAdvisor.createCacheDistributionAdvisor(this);  // Warning: potential early escape of object before full construction
 +  }
 +  
 +  /**
 +   * Does this region support persistence?
 +   */
 +  public boolean allowsPersistence() {
 +    return true;
 +  }
 +
 +  @Override
 +  public boolean requiresOneHopForMissingEntry(EntryEventImpl event) {
 +    // received from another member - don't use one-hop
 +    if (event.isOriginRemote()) {
 +      return false; 
 +    }
 +    // local ops aren't distributed
 +    if (event.getOperation().isLocal()) {
 +      return false;
 +    }
 +    // if it already has a valid version tag it can go out with a DistributedCacheOperation
 +    if (event.getVersionTag() != null && event.getVersionTag().getRegionVersion() > 0) {
 +      return false;
 +    }
 +    // if we're not allowed to generate a version tag we need to send it to someone who can
 +    if (!this.generateVersionTag) {
 +      return true;
 +    }
 +    return this.concurrencyChecksEnabled &&
 +        (this.srp == null) &&
 +        !isTX() &&
 +        this.scope.isDistributed() &&
 +        !this.dataPolicy.withReplication();
 +  }
 +  
 +  
 +  /**
 +   * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object, 
 +   * boolean, long, boolean)
 +   */
 +  @Override
 +  protected
 +  boolean virtualPut(EntryEventImpl event,
 +                     boolean ifNew,
 +                     boolean ifOld,
 +                     Object expectedOldValue,
 +                     boolean requireOldValue,
 +                     long lastModified,
 +                     boolean overwriteDestroyed)
 +  throws TimeoutException,
 +         CacheWriterException {
 +    final boolean isTraceEnabled = logger.isTraceEnabled();
 +    
 +    Lock dlock = null;
 +    if (this.scope.isGlobal() && // lock only applies to global scope
 +        !event.isOriginRemote() && // only if operation originating locally
 +        !event.isNetSearch() && // search and load processor handles own locking
 +        !event.isNetLoad() &&
 +        // @todo darrel/kirk: what about putAll?
 +        !event.isLocalLoad() &&
 +        !event.isSingleHopPutOp()) { // Single Hop Op means dlock is already taken at origin node.
 +      dlock = this.getDistributedLockIfGlobal(event.getKey());
 +    }
 +    if (isTraceEnabled) {
 +      logger.trace("virtualPut invoked for event {}", event);
 +    }
 +    try {
 +      if (!hasSeenEvent(event)) {
 +        if (this.requiresOneHopForMissingEntry(event)) {
 +          // bug #45704: see if a one-hop must be done for this operation
 +          RegionEntry re = getRegionEntry(event.getKey());
 +          if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
 +            if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
 +              // putAll will send a single one-hop for empty regions.  for other missing entries
 +              // we need to get a valid version number before modifying the local cache 
 +              boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
 +                  false, false, expectedOldValue, requireOldValue, !this.generateVersionTag);
 +
 +              if (!didDistribute && isTraceEnabled) {
 +                logger.trace("Unable to perform one-hop messaging");
 +              }
 +              if (!this.generateVersionTag && !didDistribute) {
 +                throw new PersistentReplicatesOfflineException();
 +              }
 +              if (didDistribute) {
 +                if (isTraceEnabled) {
 +                  logger.trace("Event after remotePut operation: {}", event);
 +                }
 +                if (event.getVersionTag() == null) {
 +                  // if the event wasn't applied by the one-hop replicate it will not have a version tag
 +                  // and so should not be applied to this cache
 +                  return false;
 +                }
 +              }
 +            }
 +          }
 +        }
 +        return super.virtualPut(event,
 +                                ifNew,
 +                                ifOld,
 +                                expectedOldValue,
 +                                requireOldValue,
 +                                lastModified,
 +                                overwriteDestroyed);
 +      }
 +      else {
 +        if (event.getDeltaBytes() != null && event.getRawNewValue() == null) {
 +          // This means that this event has delta bytes but no full value.
 +          // Request the full value of this event.
 +          // The value in this vm may not be same as this event's value.
 +          throw new InvalidDeltaException(
 +              "Cache encountered replay of event containing delta bytes for key "
 +                  + event.getKey());
 +        }
 +        // if the listeners have already seen this event, then it has already
 +        // been successfully applied to the cache.  Distributed messages and
 +        // return
 +        if (isTraceEnabled) {
 +          logger.trace("DR.virtualPut: this cache has already seen this event {}", event);
 +        }
 +        
 +        // Gester, Fix 39014: when hasSeenEvent, put will still distribute
 +        // event, but putAll did not. We add the logic back here, not to put 
 +        // back into DR.distributeUpdate() because we moved this part up into 
 +        // LR.basicPutPart3 in purpose. Reviewed by Bruce.  
 +        if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
 +          event.getPutAllOperation().addEntry(event, true);
 +        }
 +
 +        /* doing this so that other VMs will apply this no matter what. If it 
 +         * is an "update" they will not apply it if they don't have the key.
 +         * Because this is probably a retry, it will never get applied to this 
 +         * local AbstractRegionMap, and so will never be flipped to a 'create'
 +         */
 +        event.makeCreate();
-         distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
-         event.invokeCallbacks(this,true, true);
++        if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++          distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
++          event.invokeCallbacks(this,true, true);
++        }
 +        return true;
 +      }
 +    }
 +    finally {
 +      if (dlock != null) {
 +        dlock.unlock();
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  protected RegionEntry basicPutEntry(EntryEventImpl event, long lastModified)
 +      throws TimeoutException, CacheWriterException {
 +    
 +    final boolean isTraceEnabled = logger.isTraceEnabled();
 +    
 +    if (isTraceEnabled) {
 +      logger.trace("basicPutEntry invoked for event {}", event);
 +    }
 +    if (this.requiresOneHopForMissingEntry(event)) {
 +      // bug #45704: see if a one-hop must be done for this operation
 +      RegionEntry re = getRegionEntry(event.getKey());
 +      if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
 +        final boolean ifNew = false;
 +        final boolean ifOld = false;
 +        boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
 +            ifNew, ifOld, null, false, !this.generateVersionTag);
 +        if (!this.generateVersionTag && !didDistribute) {
 +          throw new PersistentReplicatesOfflineException();
 +        }
 +        if (didDistribute && isTraceEnabled) {
 +          logger.trace("Event after remotePut for basicPutEntry: {}", event);
 +        }
 +      }
 +    }
 +    return super.basicPutEntry(event, lastModified);
 +  }
 +
 +  @Override
 +  public void performPutAllEntry(EntryEventImpl event) {
 +	  /*
 +	   * force shared data view so that we just do the virtual op, accruing things in the put all operation for later
 +	   */
 +	if(isTX()) {
 +		event.getPutAllOperation().addEntry(event);
 +	} else {
 +		getSharedDataView().putEntry(event, false, false, null, false, 0L, false);
 +	}
 +  }
 +  
 +  @Override
 +  public void performRemoveAllEntry(EntryEventImpl event) {
 +    // force shared data view so that we just do the virtual op, accruing things in the bulk operation for later
 +    if(isTX()) {
 +      event.getRemoveAllOperation().addEntry(event);
 +    } else {
 +      basicDestroy(event, true, null);
 +      //getSharedDataView().destroyExistingEntry(event, true, null);
 +    }
 +  }
 +  
 +  /**
 +   * distribution and listener notification
 +   */
 +  @Override
 +  public void basicPutPart3(EntryEventImpl event, RegionEntry entry,
 +      boolean isInitialized, long lastModified, boolean invokeCallbacks,
 +      boolean ifNew, boolean ifOld, Object expectedOldValue,
 +      boolean requireOldValue) {
 +    
 +    distributeUpdate(event, lastModified, false, false, null, false);
 +    super.basicPutPart3(event, entry, isInitialized, lastModified,
 +        invokeCallbacks, ifNew, ifOld, expectedOldValue, requireOldValue);
 +  }
 +
 +  /** distribute an update operation */
 +  protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
 +    // an update from a netSearch is not distributed
 +    if (!event.isOriginRemote() && !event.isNetSearch() && !event.isBulkOpInProgress()) {
 +      boolean distribute = true;
 +        if (event.getInhibitDistribution()) {
 +          // this has already been distributed by a one-hop operation
 +          distribute = false;
 +        }
 +        if (distribute) {
 +          UpdateOperation op = new UpdateOperation(event, lastModified);
 +          if (logger.isTraceEnabled()) {
 +            logger.trace("distributing operation for event : {} : for region : {}", event, this.getName());
 +          }
 +          op.distribute();
 +        }
 +    }
 +  }
 +
 +  protected void setGeneratedVersionTag(boolean generateVersionTag) {
 +    // there is at-least one other persistent member, so turn on concurrencyChecks
 +    enableConcurrencyChecks();
 +    
 +    this.generateVersionTag = generateVersionTag;
 +  }
 +
 +  protected boolean getGenerateVersionTag() {
 +    return this.generateVersionTag;
 +  }
 +
 +  @Override
 +  protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
 +    if (logger.isTraceEnabled()) {
 +      logger.trace("shouldGenerateVersionTag this.generateVersionTag={} ccenabled={} dataPolicy={} event:{}",
 +          this.generateVersionTag, this.concurrencyChecksEnabled, this.dataPolicy, event);
 +    }
 +    if (!this.concurrencyChecksEnabled || this.dataPolicy == DataPolicy.EMPTY || !this.generateVersionTag) {
 +      return false;
 +    }
 +    if (this.srp != null) { // client
 +      return false;
 +    }
 +    if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
 +      return false;
 +    }
 +    if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag
 +      return false;
 +    }
 +    if (!event.isOriginRemote() && this.dataPolicy.withReplication()) {
 +      return true;
 +    }
 +    if (!this.dataPolicy.withReplication() && !this.dataPolicy.withPersistence()) {
 +      if (!entry.getVersionStamp().hasValidVersion()) {
 +        // do not generate a version stamp in a region that has no replication if it's not based
 +        // on an existing version from a replicate region
 +        return false;
 +      }
 +      return true;
 +    }
 +    if (!event.isOriginRemote() && event.getDistributedMember() != null) {
 +      if (!event.getDistributedMember().equals(this.getMyId())) {
 +        return event.getVersionTag() == null; // one-hop remote message
 +      }
 +    }
 +    return false;
 +  }
 +  /**
 +   * Throws RegionAccessException if required roles are missing and the
 +   * LossAction is NO_ACCESS
 +   * 
 +   * @throws RegionAccessException
 +   *           if required roles are missing and the LossAction is NO_ACCESS
 +   */
 +  @Override
 +  protected void checkForNoAccess()
 +  {
 +    if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
 +      if (getMembershipAttributes().getLossAction().isNoAccess()) {
 +        synchronized (this.missingRequiredRoles) {
 +          if (!this.isMissingRequiredRoles)
 +            return;
 +          Set roles = Collections.unmodifiableSet(new HashSet(
 +              this.missingRequiredRoles));
 +          throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1.toLocalizedString(new Object[] {getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
 +        }
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Throws RegionAccessException is required roles are missing and the
 +   * LossAction is either NO_ACCESS or LIMITED_ACCESS.
 +   * 
 +   * @throws RegionAccessException
 +   *           if required roles are missing and the LossAction is either
 +   *           NO_ACCESS or LIMITED_ACCESS
 +   */
 +  @Override
 +  protected void checkForLimitedOrNoAccess()
 +  {
 +    if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
 +      if (getMembershipAttributes().getLossAction().isNoAccess()
 +          || getMembershipAttributes().getLossAction().isLimitedAccess()) {
 +        synchronized (this.missingRequiredRoles) {
 +          if (!this.isMissingRequiredRoles)
 +            return;
 +          Set roles = Collections.unmodifiableSet(new HashSet(
 +              this.missingRequiredRoles));
 +          Assert.assertTrue(!roles.isEmpty());
 +          throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1
 +              .toLocalizedString(new Object[] { getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
 +        }
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  protected void handleReliableDistribution(ReliableDistributionData data,
 +      Set successfulRecipients) {
 +    handleReliableDistribution(data, successfulRecipients,
 +        Collections.EMPTY_SET, Collections.EMPTY_SET);
 +  }
 +
 +  protected void handleReliableDistribution(ReliableDistributionData data,
 +      Set successfulRecipients, Set otherRecipients1, Set otherRecipients2)
 +  {
 +    if (this.requiresReliabilityCheck) {
 +      MembershipAttributes ra = getMembershipAttributes();
 +      Set recipients = successfulRecipients;
 +      // determine the successful roles
 +      Set roles = new HashSet();
 +      for (Iterator iter = recipients.iterator(); iter.hasNext();) {
 +        InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
 +        if (mbr != null) {
 +          roles.addAll(mbr.getRoles());
 +        }
 +      }
 +      for (Iterator iter = otherRecipients1.iterator(); iter.hasNext();) {
 +        InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
 +        if (mbr != null) {
 +          roles.addAll(mbr.getRoles());
 +        }
 +      }
 +      for (Iterator iter = otherRecipients2.iterator(); iter.hasNext();) {
 +        InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
 +        if (mbr != null) {
 +          roles.addAll(mbr.getRoles());
 +        }
 +      }
 +      // determine the missing roles
 +      Set failedRoles = new HashSet(ra.getRequiredRoles());
 +      failedRoles.removeAll(roles);
 +      if (failedRoles.isEmpty())
 +        return;
 +//       if (rp.isAllAccessWithQueuing()) {
 +//         this.rmq.add(data, failedRoles);
 +//       } else {
 +
 +      throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_MAY_HAVE_FAILED_TO_NOTIFY_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
 +//       }
 +    }
 +  }
 +
 +  /**
 +   * 
 +   * Called when we do a distributed operation and don't have anyone to
 +   * distributed it too. Since this is only called when no distribution was done
 +   * (i.e. no recipients) we do not check isMissingRequiredRoles because it
 +   * might not longer be true due to race conditions
 +   * 
 +   * @return false if this region has at least one required role and queuing is
 +   *         configured. Returns true if sending to no one is ok.
 +   * @throws RoleException
 +   *           if a required role is missing and the LossAction is either
 +   *           NO_ACCESS or LIMITED_ACCESS.
 +   * @since 5.0
 +   */
 +  protected boolean isNoDistributionOk()
 +  {
 +    if (this.requiresReliabilityCheck) {
 +      MembershipAttributes ra = getMembershipAttributes();
 +      //       if (ra.getLossAction().isAllAccessWithQueuing()) {
 +      //         return !ra.hasRequiredRoles();
 +      //       } else {
 +      Set failedRoles = ra.getRequiredRoles();
 +      throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_WAS_NOT_DONE_TO_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
 +      //       }
 +    }
 +    return true;
 +  }
 +  
 +  /**
 +   * returns true if this Region does not distribute its operations to other
 +   * members.
 +   * @since 6.0
 +   * @see HARegion#localDestroyNoCallbacks(Object)
 +   */
 +  public boolean doesNotDistribute() {
 +    return false;
 +  }
 +
 +  
 +  @Override
 +  public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
 +    return !doesNotDistribute() && super.shouldSyncForCrashedMember(id);
 +  }
 +  
 +  
 +  /**
 +   * Adjust the specified set of recipients by removing any of them that are
 +   * currently having their data queued.
 +   * 
 +   * @param recipients
 +   *          the set of recipients that a message is to be distributed too.
 +   *          Recipients that are currently having their data queued will be
 +   *          removed from this set.
 +   * @return the set, possibly null, of recipients that are currently having
 +   *         their data queued.
 +   * @since 5.0
 +   */
 +  protected Set adjustForQueuing(Set recipients)
 +  {
 +    Set result = null;
 +    //     if (this.requiresReliabilityCheck) {
 +    //       MembershipAttributes ra = getMembershipAttributes();
 +    //       if (ra.getLossAction().isAllAccessWithQueuing()) {
 +    //         Set currentQueuedRoles = this.rmq.getQueuingRoles();
 +    //         if (currentQueuedRoles != null) {
 +    //           // foreach recipient see if any of his roles are queued and if
 +    //           // they are remove him from recipients and add him to result
 +    //           Iterator it = recipients.iterator();
 +    //           while (it.hasNext()) {
 +    //             DistributedMember dm = (DistributedMember)it.next();
 +    //             Set dmRoles = dm.getRoles();
 +    //             if (!dmRoles.isEmpty()) {
 +    //               if (intersects(dmRoles, currentQueuedRoles)) {
 +    //                 it.remove(); // fix for bug 34447
 +    //                 if (result == null) {
 +    //                   result = new HashSet();
 +    //                 }
 +    //                 result.add(dm);
 +    //               }
 +    //             }
 +    //           }
 +    //         }
 +    //       }
 +    //     }
 +    return result;
 +  }
 +
 +  /**
 +   * Returns true if the two sets intersect
 +   * 
 +   * @param a
 +   *          a non-null non-empty set
 +   * @param b
 +   *          a non-null non-empty set
 +   * @return true if sets a and b intersect; false if not
 +   * @since 5.0
 +   */
 +  public static boolean intersects(Set a, Set b)
 +  {
 +    Iterator it;
 +    Set target;
 +    if (a.size() <= b.size()) {
 +      it = a.iterator();
 +      target = b;
 +    }
 +    else {
 +      it = b.iterator();
 +      target = a;
 +    }
 +    while (it.hasNext()) {
 +      if (target.contains(it.next()))
 +        return true;
 +    }
 +    return false;
 +  }
 +
 +  @Override
 +  public boolean requiresReliabilityCheck()
 +  {
 +    return this.requiresReliabilityCheck;
 +  }
 +
 +  /**
 +   * Returns true if the ExpiryTask is currently allowed to expire.
 +   * <p>
 +   * If the region is in NO_ACCESS due to reliability configuration, then no
 +   * expiration actions are allowed.
 +   * <p>
 +   * If the region is in LIMITED_ACCESS due to reliability configuration, then
 +   * only non-distributed expiration actions are allowed.
 +   */
 +  @Override
 +  protected boolean isExpirationAllowed(ExpiryTask expiry)
 +  {
 +    if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
 +      if (getMembershipAttributes().getLossAction().isNoAccess()) {
 +        return false;
 +      }
 +      if (getMembershipAttributes().getLossAction().isLimitedAccess()
 +          && expiry.isDistributedAction()) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +
 +  /**
 +   * Performs the resumption action when reliability is resumed.
 +   * 
 +   * @return true if asynchronous resumption is triggered
 +   */
 +  protected boolean resumeReliability(InternalDistributedMember id,
 +      Set newlyAcquiredRoles)
 +  {
 +    boolean async = false;
 +    try {
 +      ResumptionAction ra = getMembershipAttributes().getResumptionAction();
 +      if (ra.isNone()) {
 +        if (logger.isDebugEnabled()) {
 +          logger.debug("Reliability resumption for action of none");
 +        }
 +        resumeExpiration();
 +      }
 +      else if (ra.isReinitialize()) {
 +        async = true;
 +        asyncResumeReliability(id, newlyAcquiredRoles);
 +      }
 +    }
 +    catch (Exception e) {
 +      logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
 +    }
 +    return async;
 +  }
 +
 +  /**
 +   * Handles asynchronous ResumptionActions such as region reinitialize.
 +   */
 +  private void asyncResumeReliability(final InternalDistributedMember id,
 +                                      final Set newlyAcquiredRoles)
 +                               throws RejectedExecutionException {
 +    final ResumptionAction ra = getMembershipAttributes().getResumptionAction();
 +    getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
 +      public void run()
 +      {
 +        try {
 +          if (ra.isReinitialize()) {
 +            if (logger.isDebugEnabled()) {
 +              logger.debug("Reliability resumption for action of reinitialize");
 +            }
 +            if (!isDestroyed() && !cache.isClosed()) {
 +              RegionEventImpl event = new RegionEventImpl(
 +                  DistributedRegion.this, Operation.REGION_REINITIALIZE, null,
 +                  false, getMyId(), generateEventID());
 +              reinitialize(null, event);
 +            }
 +            synchronized (missingRequiredRoles) {
 +              // any number of threads may be waiting on missingRequiredRoles
 +              missingRequiredRoles.notifyAll();
 +              if (hasListener() && id != null) {
 +                // fire afterRoleGain event
 +                RoleEventImpl relEvent = new RoleEventImpl(
 +                    DistributedRegion.this, Operation.REGION_CREATE, null,
 +                    true, id, newlyAcquiredRoles);
 +                dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN,
 +                    relEvent);
 +              }
 +            }
 +          }
 +        }
 +        catch (Exception e) {
 +          logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
 +        }
 +      }
 +    });
 +  }
 +
 +  /** Reschedules expiry tasks when reliability is resumed. */
 +  private void resumeExpiration()
 +  {
 +    boolean isNoAccess = getMembershipAttributes().getLossAction().isNoAccess();
 +    boolean isLimitedAccess = getMembershipAttributes().getLossAction()
 +        .isLimitedAccess();
 +    if (!(isNoAccess || isLimitedAccess)) {
 +      return; // early out: expiration was never affected by reliability
 +    }
 +
 +    if (getEntryTimeToLive().getTimeout() > 0
 +        && (isNoAccess || (isLimitedAccess && getEntryTimeToLive().getAction()
 +            .isDistributed()))) {
 +      rescheduleEntryExpiryTasks();
 +    }
 +    else 
 +    if (getEntryIdleTimeout().getTimeout() > 0
 +        && (isNoAccess || (isLimitedAccess && getEntryIdleTimeout().getAction()
 +            .isDistributed()))) {
 +      rescheduleEntryExpiryTasks();
 +    }
 +    else
 +    if (getCustomEntryTimeToLive() != null || getCustomEntryIdleTimeout() != null) {
 +      // Force all entries to be rescheduled
 +      rescheduleEntryExpiryTasks();
 +    }
 +
 +    if (getRegionTimeToLive().getTimeout() > 0
 +        && (isNoAccess || (isLimitedAccess && getRegionTimeToLive().getAction()
 +            .isDistributed()))) {
 +      addTTLExpiryTask();
 +    }
 +    if (getRegionIdleTimeout().getTimeout() > 0
 +        && (isNoAccess || (isLimitedAccess && getRegionIdleTimeout()
 +            .getAction().isDistributed()))) {
 +      addIdleExpiryTask();
 +    }
 +  }
 +
 +  /**
 +   * A boolean used to indicate if its the intialization time i.e the
 +   * distributed Region is created for the first time. The variable is used at
 +   * the time of lost reliablility.
 +   */
 +  private boolean isInitializingThread = false;
 +
 +  /**
 +   * Called when reliability is lost. If MembershipAttributes are configured
 +   * with {@link LossAction#RECONNECT}then DistributedSystem reconnect will be
 +   * called asynchronously.
 +   * 
 +   * @return true if asynchronous resumption is triggered
 +   */
 +  protected boolean lostReliability(final InternalDistributedMember id,
 +      final Set newlyMissingRoles)
 +  {
 +    if (DistributedRegion.ignoreReconnect)
 +      return false;
 +    boolean async = false;
 +    try {
 +      if (getMembershipAttributes().getLossAction().isReconnect()) {
 +        async = true;
 +        if (isInitializingThread) {
 +          doLostReliability(true, id, newlyMissingRoles);
 +        }
 +        else {
 +          doLostReliability(false, id, newlyMissingRoles);
 +        }
 +        // we don't do this in the waiting pool because we're going to
 +        // disconnect
 +        // the distributed system, and it will wait for the pool to empty
 +        /*
 +         * moved to a new method called doLostReliablity. Thread t = new
 +         * Thread("Reconnect Distributed System") { public void run() { try { //
 +         * TODO: may need to check isReconnecting and checkReadiness...
 +         * initializationLatchAfterMemberTimeout.await(); // TODO:
 +         * call reconnect here
 +         * getSystem().tryReconnect((GemFireCache)getCache()); // added for
 +         * reconnect. synchronized (missingRequiredRoles) { // any number of
 +         * threads may be waiting on missingRequiredRoles
 +         * missingRequiredRoles.notifyAll(); // need to fire an event if id is
 +         * not null if (hasListener() && id != null) { RoleEventImpl relEvent =
 +         * new RoleEventImpl( DistributedRegion.this, Operation.CACHE_RECONNECT,
 +         * null, true, id, newlyMissingRoles); dispatchListenerEvent(
 +         * EnumListenerEvent.AFTER_ROLE_LOSS, relEvent); } } } catch (Exception
 +         * e) { } } };
 +         * t.setDaemon(true); t.start();
 +         */
 +      }
 +    }
 +    catch (CancelException cce) {
 +      throw cce;
 +    }
 +    catch (Exception e) {
 +      logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
 +    }
 +    return async;
 +  }
 +
 +  private void doLostReliability(boolean isInitializing,
 +      final InternalDistributedMember id, final Set newlyMissingRoles)
 +  {
 +    try {
 +      if (!isInitializing) {
 +        // moved code to a new thread.
 +        Thread t = new Thread(LocalizedStrings.DistributedRegion_RECONNECT_DISTRIBUTED_SYSTEM.toLocalizedString()) {
 +          @Override
 +          public void run()
 +          {
 +            try {
 +              // TODO: may need to check isReconnecting and checkReadiness...
 +              if (logger.isDebugEnabled()) {
 +                logger.debug("Reliability loss with policy of reconnect and membership thread doing reconnect");
 +              }
 +              initializationLatchAfterMemberTimeout.await();
 +              getSystem().tryReconnect(false, "Role Loss", getCache());
 +              synchronized (missingRequiredRoles) {
 +                // any number of threads may be waiting on missingRequiredRoles
 +                missingRequiredRoles.notifyAll();
 +                // need to fire an event if id is not null
 +                if (hasListener() && id != null) {
 +                  RoleEventImpl relEvent = new RoleEventImpl(
 +                      DistributedRegion.this, Operation.CACHE_RECONNECT, null,
 +                      true, id, newlyMissingRoles);
 +                  dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS,
 +                      relEvent);
 +                }
 +              }
 +            }
 +            catch (Exception e) {
 +              logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
 +            }
 +          }
 +        };
 +        t.setDaemon(true);
 +        t.start();
 +
 +      }
 +      else {
 +        getSystem().tryReconnect(false, "Role Loss", getCache()); // added for
 +        // reconnect.
 +        synchronized (missingRequiredRoles) {
 +          // any number of threads may be waiting on missingRequiredRoles
 +          missingRequiredRoles.notifyAll();
 +          // need to fire an event if id is not null
 +          if (hasListener() && id != null) {
 +            RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
 +                Operation.CACHE_RECONNECT, null, true, id, newlyMissingRoles);
 +            dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
 +          }
 +        }
 +        // } catch (CancelException cce){
 +
 +        // }
 +
 +      }
 +    }
 +    catch (CancelException ignor) {
 +      throw ignor;
 +    }
 +    catch (Exception e) {
 +      logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
 +    }
 +
 +  }
 +
 +  protected void lockCheckReadiness()
 +  {
 +    // fix for bug 32610
 +    cache.getCancelCriterion().checkCancelInProgress(null);
 +    checkReadiness();
 +  }
 +
 +  @Override
 +  public final Object validatedDestroy(Object key, EntryEventImpl event)
 +      throws TimeoutException, EntryNotFoundException, CacheWriterException {
 +    Lock dlock = this.getDistributedLockIfGlobal(key);
 +    try {
 +      return super.validatedDestroy(key, event);
 +    } finally {
 +      if (dlock != null) {
 +        dlock.unlock();
 +      }
 +    }
 +  }
 +
 +  /**
 +   * @see LocalRegion#localDestroyNoCallbacks(Object)
 +   */
 +  @Override
 +  public void localDestroyNoCallbacks(Object key)
 +  {
 +    super.localDestroyNoCallbacks(key);
 +    if (getScope().isGlobal()) {
 +      try {
 +        this.getLockService().freeResources(key);
 +      }
 +      catch (LockServiceDestroyedException ignore) {
 +      }
 +    }
 +  }
 +
 +  /**
 +   * @see LocalRegion#localDestroy(Object, Object)
 +   */
 +  @Override
 +  public void localDestroy(Object key, Object aCallbackArgument)
 +      throws EntryNotFoundException
 +  {
 +    super.localDestroy(key, aCallbackArgument);
 +    if (getScope().isGlobal()) {
 +      try {
 +        this.getLockService().freeResources(key);
 +      }
 +      catch (LockServiceDestroyedException ignore) {
 +      }
 +    }
 +  }
 +
 +  /**
 +   * @see LocalRegion#invalidate(Object, Object)
 +   */
 +  @Override
 +  public void invalidate(Object key, Object aCallbackArgument)
 +      throws TimeoutException, EntryNotFoundException
 +  {
 +    validateKey(key);
 +    validateCallbackArg(aCallbackArgument);
 +    checkReadiness();
 +    checkForLimitedOrNoAccess();
 +    Lock dlock = this.getDistributedLockIfGlobal(key);
 +    try {
 +      super.validatedInvalidate(key, aCallbackArgument);
 +    }
 +    finally {
 +      if (dlock != null)
 +        dlock.unlock();
 +    }
 +  }
 +
 +  @Override
 +  public Lock getRegionDistributedLock() throws IllegalStateException
 +  {
 +    lockCheckReadiness();
 +    checkForLimitedOrNoAccess();
 +    if (!this.scope.isGlobal()) {
 +      throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
 +    }
 +    return new RegionDistributedLock();
 +  }
 +
 +  @Override
 +  public Lock getDistributedLock(Object key) throws IllegalStateException
 +  {
 +    validateKey(key);
 +    lockCheckReadiness();
 +    checkForLimitedOrNoAccess();
 +    if (!this.scope.isGlobal()) {
 +      throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
 +    }
 +    if (isLockingSuspendedByCurrentThread()) {
 +      throw new IllegalStateException(LocalizedStrings.DistributedRegion_THIS_THREAD_HAS_SUSPENDED_ALL_LOCKING_FOR_THIS_REGION.toLocalizedString());
 +    }
 +    return new DistributedLock(key);
 +  }
 +
 +  /**
 +   * Called while NOT holding lock on parent's subregions
 +   * 
 +   * @throws IllegalStateException
 +   *           if region is not compatible with a region in another VM.
 +   * 
 +   * @see LocalRegion#initialize(InputStream, InternalDistributedMember, InternalRegionArguments)
 +   */
 +  @Override
 +  protected void initialize(InputStream snapshotInputStream,
 +      InternalDistributedMember imageTarget, InternalRegionArguments internalRegionArgs) throws TimeoutException,
 +      IOException, ClassNotFoundException
 +  {
 +    Assert.assertTrue(!isInitialized());
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("DistributedRegion.initialize BEGIN: {}", getFullPath());
 +    }
 +
 +    // if we're versioning entries we need a region-level version vector
 +    if (this.scope.isDistributed() && this.concurrencyChecksEnabled) {
 +      createVersionVector();
 +    }
 +
 +    if (this.scope.isGlobal()) {
 +      getLockService(); // create lock service eagerly now
 +    }
 +
 +    final IndexUpdater indexUpdater = getIndexUpdater();
 +    boolean sqlfGIILockTaken = false;
 +    // this try block is to release the SQLF GII lock in finally
 +    // which should be done after bucket status will be set
 +    // properly in LocalRegion#initialize()
 +    try {
 +     try {
 +      try {
 +        // take the GII lock to avoid missing entries while updating the
 +        // index list for SQLFabric (#41330 and others)
 +        if (indexUpdater != null) {
 +          indexUpdater.lockForGII();
 +          sqlfGIILockTaken = true;
 +        }
 +        
 +        PersistentMemberID persistentId = null;
 +        boolean recoverFromDisk = isRecoveryNeeded();
 +        DiskRegion dskRgn = getDiskRegion();
 +        if (recoverFromDisk) {
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("DistributedRegion.getInitialImageAndRecovery: Starting Recovery");
 +          }
 +          dskRgn.initializeOwner(this); // do recovery
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery");
 +          }
 +          persistentId = dskRgn.getMyPersistentID();
 +        }
 +        
 +        // Create OQL indexes before starting GII.
 +        createOQLIndexes(internalRegionArgs, recoverFromDisk);
 + 
 +        if (getDataPolicy().withReplication()
 +            || getDataPolicy().withPreloaded()) {
 +          getInitialImageAndRecovery(snapshotInputStream, imageTarget,
 +              internalRegionArgs, recoverFromDisk, persistentId);
 +        }
 +        else {
 +          new CreateRegionProcessor(this).initializeRegion();
 +          if (snapshotInputStream != null) {
 +            releaseBeforeGetInitialImageLatch();
 +            loadSnapshotDuringInitialization(snapshotInputStream);
 +          }
 +        }
 +      }
 +      catch (DiskAccessException dae) {
 +        this.handleDiskAccessException(dae, true);
 +        throw dae;
 +      }
 +
 +      initMembershipRoles();
 +      isInitializingThread = false;
 +      super.initialize(null, null, null); // makes sure all latches are released if they haven't been already
 +     } finally {
 +      if (this.eventTracker != null) {
 +        this.eventTracker.setInitialized();
 +      }
 +     }
 +    } finally {
 +      if (sqlfGIILockTaken) {
 +        indexUpdater.unlockForGII();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void initialized() {
 +    new UpdateAttributesProcessor(this).distribute(false);
 +  }
 +
 +  /** True if GII was impacted by missing required roles */
 +  private boolean giiMissingRequiredRoles = false;
 +
 +  /**
 +   * A reference counter to protected the memoryThresholdReached boolean
 +   */
 +  private final Set<DistributedMember> memoryThresholdReachedMembers =
 +    new CopyOnWriteArraySet<DistributedMember>();
 +
 +  private ConcurrentParallelGatewaySenderQueue hdfsQueue;
 +
 +  /** Sets and returns giiMissingRequiredRoles */
 +  private boolean checkInitialImageForReliability(
 +      InternalDistributedMember imageTarget,
 +      CacheDistributionAdvisor.InitialImageAdvice advice)
 +  {
 +    // assumption: required roles are interesting to GII only if Reinitialize...
 +//    if (true)
 +      return false;
 +//    if (getMembershipAttributes().hasRequiredRoles()
 +//        && getMembershipAttributes().getResumptionAction().isReinitialize()) {
 +//      // are any required roles missing for GII with Reinitialize?
 +//      Set missingRR = new HashSet(getMembershipAttributes().getRequiredRoles());
 +//      missingRR.removeAll(getSystem().getDistributedMember().getRoles());
 +//      for (Iterator iter = advice.replicates.iterator(); iter.hasNext();) {
 +//        DistributedMember member = (DistributedMember)iter.next();
 +//        missingRR.removeAll(member.getRoles());
 +//      }
 +//      for (Iterator iter = advice.others.iterator(); iter.hasNext();) {
 +//        DistributedMember member = (DistributedMember)iter.next();
 +//        missingRR.removeAll(member.getRoles());
 +//      }
 +//      for (Iterator iter = advice.preloaded.iterator(); iter.hasNext();) {
 +//        DistributedMember member = (DistributedMember)iter.next();
 +//        missingRR.removeAll(member.getRoles());
 +//      }
 +//      if (!missingRR.isEmpty()) {
 +//        // entering immediate loss condition, which will cause reinit on resume
 +//        this.giiMissingRequiredRoles = true;
 +//      }
 +//    }
 +//    return this.giiMissingRequiredRoles;
 +  }
 +
 +  private void getInitialImageAndRecovery(InputStream snapshotInputStream,
 +      InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs,
 +      boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException
 +  {
 +    logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_0, this.getName()));
 +  
 +    ImageState imgState = getImageState();
 +    imgState.init();
 +    boolean targetRecreated = internalRegionArgs.getRecreateFlag();
 +    Boolean isCBool = (Boolean)isConversion.get();
 +    boolean isForConversion = isCBool!=null?isCBool.booleanValue():false;
 +
 +    if (recoverFromDisk && snapshotInputStream != null && !isForConversion) {
 +      throw new InternalGemFireError(LocalizedStrings.DistributedRegion_IF_LOADING_A_SNAPSHOT_THEN_SHOULD_NOT_BE_RECOVERING_ISRECOVERING_0_SNAPSHOTSTREAM_1.toLocalizedString(new Object[] {Boolean.valueOf(recoverFromDisk), snapshotInputStream}));
 +    }
 +
 +    ProfileExchangeProcessor targetProvider;
 +    if (dataPolicy.withPersistence()) {
 +      targetProvider = new CreatePersistentRegionProcessor(this,
 +          getPersistenceAdvisor(), recoverFromDisk);
 +    }
 +    else {
 +      // this will go in the advisor profile
 +      targetProvider = new CreateRegionProcessor(this);
 +    }
 +    imgState.setInRecovery(false);
 +    RegionVersionVector recovered_rvv = null;
 +    if (dataPolicy.withPersistence()) {
 +      recovered_rvv = (this.getVersionVector()==null?null:this.getVersionVector().getCloneForTransmission());
 +    }
 +      // initializeRegion will send out our profile
 +    targetProvider.initializeRegion();
 +    
 +    if(persistenceAdvisor != null) {
 +      persistenceAdvisor.initialize();
 +    }
 +    
 +    // Register listener here so that the remote members are known
 +    // since registering calls initializeCriticalMembers (which needs to know about
 +    // remote members
 +    if (!isInternalRegion()) {
 +      if (!this.isDestroyed) {
 +        cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
 +      }
 +    }
 +    
 +    releaseBeforeGetInitialImageLatch();
 +
 +    // allow GII to invoke test hooks.  Do this just after releasing the
 +    // before-gii latch for bug #48962.  See ConcurrentLeaveDuringGIIDUnitTest
 +    InitialImageOperation.beforeGetInitialImage(this);
 +    
 +    if (snapshotInputStream != null) {
 +      try {
 +        if (logger.isDebugEnabled()) {
 +          logger.debug("DistributedRegion.getInitialImageAndRecovery: About to load snapshot, isInitialized={}; {}",
 +              isInitialized(), getFullPath());
 +        }
 +        loadSnapshotDuringInitialization(snapshotInputStream);
 +      }
 +      catch (IOException e) {
 +        throw new RuntimeException(e); // @todo change this exception?
 +      }
 +      catch (ClassNotFoundException e) {
 +        throw new RuntimeException(e); // @todo change this exception?
 +      }
 +      cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
 +      return;
 +    }
 +    
 +    // No snapshot provided, use the imageTarget(s)
 +
 +    // if we were given a recommended imageTarget, use that first, and
 +    // treat it like it is a replicate (regardless of whether it actually is
 +    // or not)
 +
 +    InitialImageOperation iiop = new InitialImageOperation(this, this.entries);
 +    // [defunct] Special case GII for PR admin regions (which are always
 +    // replicates and always writers
 +    // bruce: this was commented out after adding the GIIAckRequest logic to
 +    // force
 +    //        consistency before the gii operation begins
 +    //      if (isUsedForPartitionedRegionAdmin() ||
 +    // isUsedForPartitionedRegionBucket()) {
 +    //        releaseBeforeGetInitialImageLatch();
 +    //        iiop.getFromAll(this.distAdvisor.adviseGeneric(), false);
 +    //        cleanUpDestroyedTokens();
 +    //        return;
 +    //      }
 +
 +
 +    CacheDistributionAdvisor.InitialImageAdvice advice = null;
 +    boolean done = false;
 +    while(!done && !isDestroyed()) {
 +      advice = targetProvider.getInitialImageAdvice(advice);
 +      checkInitialImageForReliability(imageSrc, advice);
 +      boolean attemptGetFromOne = 
 +        imageSrc != null // we were given a specific member
 +        || this.dataPolicy.withPreloaded()
 +           && !advice.preloaded.isEmpty() // this is a preloaded region
 +        || (!advice.replicates.isEmpty());
 +      // That is: if we have 0 or 1 giiProvider then we can do a getFromOne gii;
 +      // if we have 2 or more giiProviders then we must do a getFromAll gii.
 +
 +      if (attemptGetFromOne) {
 +        if (recoverFromDisk) {
 +          if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
 +            CacheObserverHolder.getInstance().afterMarkingGIIStarted();
 +          }
 +        }
 +        {
 +          // If we have an imageSrc and the target is reinitializing mark the
 +          // getInitialImage so that it will wait until the target region is fully initialized
 +          // before responding to the get image request. Otherwise, the
 +          // source may respond with no data because it is still initializing,
 +          // e.g. loading a snapshot.
 +
 +          // Plan A: use specified imageSrc, if specified
 +          if (imageSrc != null) {
 +            try {
 +              GIIStatus ret = iiop.getFromOne(Collections.singleton(imageSrc),
 +                  targetRecreated, advice, recoverFromDisk, recovered_rvv);
 +              if (GIIStatus.didGII(ret)) {
 +                this.giiMissingRequiredRoles = false;
 +                cleanUpDestroyedTokensAndMarkGIIComplete(ret);
 +                done = true;
 +                return;
 +              }
 +            } finally {
 +              imageSrc = null;
 +            }
 +          }
 +
 +          // Plan C: use a replicate, if one exists
 +          GIIStatus ret = iiop.getFromOne(advice.replicates, false, advice, recoverFromDisk, recovered_rvv);
 +          if (GIIStatus.didGII(ret)) {
 +            cleanUpDestroyedTokensAndMarkGIIComplete(ret);
 +            done = true;
 +            return;
 +          }
 +
 +          // Plan D: if this is a PRELOADED region, fetch from another PRELOADED
 +          if (this.dataPolicy.isPreloaded()) {
 +            GIIStatus ret_preload = iiop.getFromOne(advice.preloaded, false, advice, recoverFromDisk, recovered_rvv);
 +            if (GIIStatus.didGII(ret_preload)) {
 +              cleanUpDestroyedTokensAndMarkGIIComplete(ret_preload);
 +              done = true;
 +              return;
 +            }
 +          } // isPreloaded
 +        }
 +
 +        //If we got to this point, we failed in the GII. Cleanup
 +        //any partial image we received
 +        cleanUpAfterFailedGII(recoverFromDisk);
 +
 +      } // attemptGetFromOne
 +      else {
 +        if(!isDestroyed()) {
 +          if(recoverFromDisk) {
 +            logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZED_FROM_DISK,
 +                new Object[] {this.getFullPath(), persistentId, getPersistentID()}));
 +            if(persistentId != null) {
 +              RegionLogger.logRecovery(this.getFullPath(), persistentId,
 +                  getDistributionManager().getDistributionManagerId());
 +            }
 +          } else {
 +            RegionLogger.logCreate(this.getFullPath(),
 +                getDistributionManager().getDistributionManagerId());
 +            
 +            if (getPersistentID() != null) {
 +              RegionLogger.logPersistence(this.getFullPath(),
 +                  getDistributionManager().getDistributionManagerId(),
 +                  getPersistentID());
 +              logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_NEW_PERSISTENT_REGION_CREATED,
 +                  new Object[] {this.getFullPath(), getPersistentID()}));
 +            }
 +          }
 +          
 +          /* no more union GII
 +            // do union getInitialImage
 +            Set rest = new HashSet();
 +            rest.addAll(advice.others);
 +            rest.addAll(advice.preloaded);
 +            // push profile w/ recovery flag turned off at same time that we
 +            // do a union getInitialImage
 +            boolean pushProfile = recoverFromDisk;
 +            iiop.getFromAll(rest, pushProfile);
 +           */
 +          cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
 +          done = true;
 +          return;
 +        }
 +        break;
 +      }
 +    }
 +
 +    return;
 +  }
 +  
 +  private void synchronizeWith(InternalDistributedMember target, 
 +      VersionSource idToRecover) {
 +    InitialImageOperation op = new InitialImageOperation(this, this.entries);
 +    op.synchronizeWith(target, idToRecover, null);
 +  }
 +  
 +  /**
 +   * If this region has concurrency controls enabled this will pull any missing
 +   * changes from other replicates using InitialImageOperation and a filtered
 +   * chunking protocol.
 +   */
 +  public void synchronizeForLostMember(InternalDistributedMember
 +      lostMember, VersionSource lostVersionID) {
 +    if (this.concurrencyChecksEnabled == false) {
 +      return;
 +    }
 +    CacheDistributionAdvisor advisor = getCacheDistributionAdvisor();
 +    Set<InternalDistributedMember> targets = advisor.adviseInitializedReplicates();
 +    for (InternalDistributedMember target: targets) {
 +      synchronizeWith(target, lostVersionID, lostMember);
 +    }
 +  }
 +  
 +  /**
 +   * synchronize with another member wrt messages from the given "lost" member.
 +   * This can be used when a primary bucket crashes to ensure that interrupted
 +   * message distribution is mended.
 +   */
 +  private void synchronizeWith(InternalDistributedMember target,
 +      VersionSource versionMember, InternalDistributedMember lostMember) {
 +    InitialImageOperation op = new InitialImageOperation(this, this.entries);
 +    op.synchronizeWith(target, versionMember, lostMember);
 +  }
 +
 +  /**
 +   * invoked just before an initial image is requested from another member
 +   */
 +  /** remove any partial entries received in a failed GII */
 +  protected void cleanUpAfterFailedGII(boolean recoverFromDisk) {
 +    DiskRegion dskRgn = getDiskRegion();
 +    //if we have a persistent region, instead of deleting everything on disk,
 +    //we will just reset the "recovered from disk" flag. After
 +    //the next GII we will delete these entries if they do not come
 +    //in as part of the GII.
 +    if (recoverFromDisk && dskRgn != null && dskRgn.isBackup()) {
 +      dskRgn.resetRecoveredEntries(this);
 +      return;
 +    }
 +
 +    if (!this.entries.isEmpty()) {
 +      closeEntries();
 +      if (getDiskRegion() != null) {
 +        getDiskRegion().clear(this, null);
 +      }
 +      // clear the left-members and version-tags sets in imageState
 +      getImageState().getLeftMembers();
 +      getImageState().getVersionTags();
 +      // Clear OQL indexes
 +      if (this.indexManager != null) {
 +        try {
 +          this.indexManager.rerunIndexCreationQuery();
 +        } catch (Exception ex){
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("Exception while clearing indexes after GII failure.", ex);
 +          }
 +        }
 +      }
 +    }
 +  }
 +
 +  private void initMembershipRoles()
 +  {
 +    synchronized (this.advisorListener) {
 +      // hold sync to prevent listener from changing initial members
 +      Set others = this.distAdvisor
 +          .addMembershipListenerAndAdviseGeneric(this.advisorListener);
 +      this.advisorListener.addMembers(others);
 +      // initialize missing required roles with initial member info
 +      if (getMembershipAttributes().hasRequiredRoles()) {
 +        // AdvisorListener will also sync on missingRequiredRoles
 +        synchronized (this.missingRequiredRoles) {
 +          this.missingRequiredRoles.addAll(getMembershipAttributes()
 +              .getRequiredRoles());
 +          // remove all the roles we are playing since they will never be
 +          // missing
 +          this.missingRequiredRoles.removeAll(getSystem()
 +              .getDistributedMember().getRoles());
 +          for (Iterator iter = others.iterator(); iter.hasNext();) {
 +            DistributedMember other = (DistributedMember)iter.next();
 +            this.missingRequiredRoles.removeAll(other.getRoles());
 +          }
 +        }
 +      }
 +    }
 +    if (getMembershipAttributes().hasRequiredRoles()) {
 +      // wait up to memberTimeout for required roles...
 +//      boolean requiredRolesAreMissing = false;
 +      int memberTimeout = getSystem().getConfig().getMemberTimeout();
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Waiting up to {} for required roles.", memberTimeout);
 +      }
 +      try {
 +        if (this.giiMissingRequiredRoles) {
 +          // force reliability loss and possibly resumption
 +          isInitializingThread = true;
 +          synchronized (this.advisorListener) {
 +            synchronized (this.missingRequiredRoles) {
 +              // forcing state of loss because of bad GII
 +              this.isMissingRequiredRoles = true;
 +              getCachePerfStats().incReliableRegionsMissing(1);
 +              if (getMembershipAttributes().getLossAction().isAllAccess())
 +                getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
 +              else if (getMembershipAttributes().getLossAction()
 +                  .isLimitedAccess())
 +                getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
 +              else if (getMembershipAttributes().getLossAction().isNoAccess())
 +                getCachePerfStats().incReliableRegionsMissingNoAccess(1);
 +              // pur code to increment the stats.
 +              if (logger.isDebugEnabled()) {
 +                logger.debug("GetInitialImage had missing required roles.");
 +              }
 +              // TODO: will this work with RECONNECT and REINITIALIZE?
 +              isInitializingThread = true;
 +              lostReliability(null, null);
 +              if (this.missingRequiredRoles.isEmpty()) {
 +                // all required roles are present so force resumption
 +                this.isMissingRequiredRoles = false;
 +                getCachePerfStats().incReliableRegionsMissing(-1);
 +                if (getMembershipAttributes().getLossAction().isAllAccess())
 +                  getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
 +                else if (getMembershipAttributes().getLossAction()
 +                    .isLimitedAccess())
 +                  getCachePerfStats()
 +                      .incReliableRegionsMissingLimitedAccess(-1);
 +                else if (getMembershipAttributes().getLossAction().isNoAccess())
 +                  getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
 +                // pur code to increment the stats.
 +                boolean async = resumeReliability(null, null);
 +                if (async) {
 +                  advisorListener.destroyed = true;
 +                }
 +              }
 +            }
 +          }
 +        }
 +        else {
 +          if (!getSystem().isLoner()) {
 +            waitForRequiredRoles(memberTimeout);
 +          }
 +          synchronized (this.advisorListener) {
 +            synchronized (this.missingRequiredRoles) {
 +              if (this.missingRequiredRoles.isEmpty()) {
 +                Assert.assertTrue(!this.isMissingRequiredRoles);
 +                if (logger.isDebugEnabled()) {
 +                  logger.debug("Initialization completed with all required roles present.");
 +                }
 +              }
 +              else {
 +                // starting in state of loss...
 +                this.isMissingRequiredRoles = true;
 +                getCachePerfStats().incReliableRegionsMissing(1);
 +                if (getMembershipAttributes().getLossAction().isAllAccess())
 +                  getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
 +                else if (getMembershipAttributes().getLossAction()
 +                    .isLimitedAccess())
 +                  getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
 +                else if (getMembershipAttributes().getLossAction().isNoAccess())
 +                  getCachePerfStats().incReliableRegionsMissingNoAccess(1);
 +                
 +                if (logger.isDebugEnabled()) {
 +                  logger.debug("Initialization completed with missing required roles: {}", this.missingRequiredRoles);
 +                }
 +                isInitializingThread = true;
 +                lostReliability(null, null);
 +              }
 +            }
 +          }
 +        }
 +      }
 +      catch (RegionDestroyedException ignore) {
 +        // ignore to fix bug 34639 may be thrown by waitForRequiredRoles
 +      }
 +      catch (CancelException ignore) {
 +        // ignore to fix bug 34639 may be thrown by waitForRequiredRoles
 +        if (isInitializingThread) {
 +          throw ignore;
 +        }
 +      }
 +      catch (Exception e) {
 +        logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
 +      }
 +
 +    }
 +    // open latch which will allow any threads in lostReliability to proceed
 +    this.initializationLatchAfterMemberTimeout.countDown();
 +  }
 +  private boolean isRecoveryNeeded() {
 +    return getDataPolicy().withPersistence()
 +      && getDiskRegion().isRecreated();
 +  }
 +
 +  // called by InitialImageOperation to clean up destroyed tokens
 +  // release afterGetInitialImageInitializationLatch before unlocking
 +  // cleanUpLock
 +  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK")
 +  private void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus)
 +  {
 +    //We need to clean up the disk before we release the after get initial image latch
 +    DiskRegion dskRgn = getDiskRegion();
 +    if (dskRgn != null && dskRgn.isBackup()) {
 +      dskRgn.finishInitializeOwner(this, giiStatus);
 +    }
 +    ImageState is = getImageState();
 +    is.lockGII();
 +    // clear the version tag and left-members sets
 +    is.getVersionTags();
 +    is.getLeftMembers();
 +    // remove DESTROYED tokens
 +    RegionVersionVector rvv = is.getClearRegionVersionVector();
 +    try {
 +      Iterator/*<Object>*/ keysIt = getImageState().getDestroyedEntries();
 +      while (keysIt.hasNext()) {
 +        this.entries.removeIfDestroyed(keysIt.next());
 +      }
 +      if (rvv != null) {
 +        // clear any entries received in the GII that are older than the RVV versions.
 +        // this can happen if entry chunks were received prior to the clear() being
 +        // processed
 +        clearEntries(rvv);
 +      }
 +      //need to do this before we release the afterGetInitialImageLatch
 +      if(persistenceAdvisor != null) {
 +        persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
 +      }
 +    }
 +    finally {
 +      // release after gii lock first so basicDestroy will see isInitialized()
 +      // be true
 +      // when they get the cleanUp lock.
 +        try {
 +          releaseAfterGetInitialImageLatch();
 +        } finally { // make sure unlockGII is done for bug 40001
 +          is.unlockGII();
 +        }
 +    }
 +    
 +    if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
 +      CacheObserverHolder.getInstance().afterMarkingGIICompleted();
 +    }
 +    
 +    //"Initializing region {0}" which is not acompanied by a completed message. Users think thread is stuck in some operation. Hence adding this log
 +    logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_COMPLETED_0, this.getName()));
 +  }
 +
 +  /**
 +   * @see LocalRegion#basicDestroy(EntryEventImpl, boolean, Object)
 +   */
 +  @Override
 +  protected
 +  void basicDestroy(EntryEventImpl event,
 +                       boolean cacheWrite,
 +                       Object expectedOldValue)
 +  throws EntryNotFoundException, CacheWriterException, TimeoutException {
 +    //  disallow local destruction for mirrored keysvalues regions
 +    boolean invokeWriter = cacheWrite;
 +    boolean hasSeen = false;
 +    if (hasSeenEvent(event)) {
 +      hasSeen = true;
 +    }
 +    checkIfReplicatedAndLocalDestroy(event);
 +    
 +    try {
 +      if (this.requiresOneHopForMissingEntry(event)) {
 +        // bug #45704: see if a one-hop must be done for this operation
 +        RegionEntry re = getRegionEntry(event.getKey());
 +        if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
 +          if (this.srp == null) {
 +            // only assert for non-client regions.
 +            Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
 +          }
 +          if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
 +            // removeAll will send a single one-hop for empty regions.  for other missing entries
 +            // we need to get a valid version number before modifying the local cache 
 +          // TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
 +          boolean didDistribute = RemoteDestroyMessage.distribute(event, expectedOldValue, !this.generateVersionTag);
 +
 +          if (!this.generateVersionTag && !didDistribute) {
 +            throw new PersistentReplicatesOfflineException();
 +          }
 +          
 +          if (didDistribute) {
 +            if (logger.isTraceEnabled()) {
 +              logger.trace("Event after remoteDestroy operation: {}", event);
 +            }
 +            invokeWriter = false; // remote cache invoked the writer
 +            if (event.getVersionTag() == null) {
 +              // if the event wasn't applied by the one-hop replicate it will not have a version tag
 +              // and so should not be applied to this cache
 +              return;
 +            }
 +          }
 +          }
 +        }
 +      }
 +      
 +      super.basicDestroy(event, invokeWriter, expectedOldValue);
 +
 +      // if this is a destroy coming in from remote source, free up lock resources
 +      // if this is a local origin destroy, this will happen after lock is
 +      // released
 +      if (this.scope.isGlobal() && event.isOriginRemote()) {
 +        try {
 +          getLockService().freeResources(event.getKey());
 +        }
 +        catch (LockServiceDestroyedException ignore) {
 +        }
 +      }
 +  
 +      return;
 +    } 
 +    finally {
 +      if (hasSeen) {
 +        if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
 +          event.getRemoveAllOperation().addEntry(event, true);
 +        }
-         distributeDestroy(event, expectedOldValue);
-         event.invokeCallbacks(this,true, false);
++        if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++          distributeDestroy(event, expectedOldValue);
++          event.invokeCallbacks(this,true, false);
++        }
 +      }
 +    }
 +  }
 +
 +  @Override
 +  void basicDestroyPart3(RegionEntry re, EntryEventImpl event,
 +      boolean inTokenMode, boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
 +  
 +    distributeDestroy(event, expectedOldValue);
 +    super.basicDestroyPart3(re, event, inTokenMode, duringRI, invokeCallbacks, expectedOldValue);
 +  }
 +  
 +  void distributeDestroy(EntryEventImpl event, Object expectedOldValue) {
 +    if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
 +      boolean distribute = !event.getInhibitDistribution();
 +      if (distribute) {
 +        DestroyOperation op =  new DestroyOperation(event);
 +        op.distribute();
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  boolean evictDestroy(LRUEntry entry) {
 +    boolean evictDestroyWasDone = super.evictDestroy(entry);
 +    if (evictDestroyWasDone) {
 +      if (this.scope.isGlobal()) {
 +        try {
 +          getLockService().freeResources(entry.getKey());
 +        }
 +        catch (LockServiceDestroyedException ignore) {
 +        }
 +      }
 +    }
 +    return evictDestroyWasDone;
 +  }
 +
 +
 +  /**
 +   * @see LocalRegion#basicInvalidateRegion(RegionEventImpl)
 +   */
 +  @Override
 +  void basicInvalidateRegion(RegionEventImpl event)
 +  {
 +    // disallow local invalidation for replicated regions
 +    if (!event.isDistributed() && getScope().isDistributed()
 +        && getDataPolicy().withReplication()) {
 +      throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
 +    }
 +    if (shouldDistributeInvalidateRegion(event)) {
 +      distributeInvalidateRegion(event);
 +    }
 +    super.basicInvalidateRegion(event);
 +  }
 +
 +  /**
 +   * decide if InvalidateRegionOperation should be sent to peers. broken out so
 +   * that BucketRegion can override
 +   * @param event
 +   * @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise
 +   */
 +  protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
 +    return event.isDistributed() && !event.isOriginRemote();
 +  }
 +
 +  /**
 +   * Distribute the invalidate of a region given its event.
 +   * This implementation sends the invalidate to peers.
 +   * @since 5.7
 +   */
 +  protected void distributeInvalidateRegion(RegionEventImpl event) {
 +    new InvalidateRegionOperation(event).distribute();
 +  }
 +
 +  /**
 +   * @see LocalRegion#basicDestroyRegion(RegionEventImpl, boolean, boolean,
 +   *      boolean)
 +   */
 +  @Override
 +  void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite,
 +      boolean lock, boolean callbackEvents) throws CacheWriterException,
 +      TimeoutException
 +  {
 +    final String path = getFullPath();
 +    //Keep track of regions that are being destroyed. This helps avoid a race
 +    //when another member concurrently creates this region. See bug 42051.
 +    boolean isClose = event.getOperation().isClose();
 +    if(!isClose) {
 +      cache.beginDestroy(path, this);
 +    }
 +    try {
 +      super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents);
 +      // send destroy region operation even if this is a localDestroyRegion (or
 +      // close)
 +      if (!event.isOriginRemote()) {
 +        distributeDestroyRegion(event, true);
 +      } else {
 +        if(!event.isReinitializing()) {
 +          RegionEventImpl localEvent = new RegionEventImpl(this,
 +              Operation.REGION_LOCAL_DESTROY, event.getCallbackArgument(), false, getMyId(),
 +              generateEventID()/* generate EventID */);
 +          distributeDestroyRegion(localEvent, false/*fixes bug 41111*/);
 +        }
 +      }
 +      notifyBridgeClients(event);
 +    }
 +    catch (CancelException e) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("basicDestroyRegion short-circuited due to cancellation");
 +      }
 +    }
 +    finally {
 +      if(!isClose) {
 +        cache.endDestroy(path, this);
 +      }
 +      RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose);
 +    }
 + }
 +
 +
 +  @Override
 +  protected void distributeDestroyRegion(RegionEventImpl event,
 +                                         boolean notifyOfRegionDeparture) {
 +    if(persistenceAdvisor != null) {
 +      persistenceAdvisor.releaseTieLock();
 +    }
 +    new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
 +  }
 +
 +  /**
 +   * Return true if invalidation occurred; false if it did not, for example if
 +   * it was already invalidated
 +   * 
 +   * @see LocalRegion#basicInvalidate(EntryEventImpl)
 +   */
 +  @Override
 +  void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException
 +  {
 +    
 +    boolean hasSeen = false;
 +    if (hasSeenEvent(event)) {
 +      hasSeen = true;
 +    }
 +    try {
 +      // disallow local invalidation for replicated regions
 +      if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed()
 +          && getDataPolicy().withReplication()) {
 +        throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
 +      }
 +      if (this.requiresOneHopForMissingEntry(event)) {
 +        // bug #45704: see if a one-hop must be done for this operation
 +        RegionEntry re = getRegionEntry(event.getKey());
 +        if (re == null/* || re.isTombstone()*/ || !this.generateVersionTag) {
 +          if (this.srp == null) {
 +            // only assert for non-client regions.
 +            Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
 +          }
 +          // TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
 +          boolean didDistribute = RemoteInvalidateMessage.distribute(event, !this.generateVersionTag);
 +          if (!this.generateVersionTag && !didDistribute) {
 +            throw new PersistentReplicatesOfflineException();
 +          }
 +          if (didDistribute) {
 +            if (logger.isDebugEnabled()) {
 +              logger.debug("Event after remoteInvalidate operation: {}", event);
 +            }
 +            if (event.getVersionTag() == null) {
 +              // if the event wasn't applied by the one-hop replicate it will not have a version tag
 +              // and so should not be applied to this cache
 +              return;
 +            }
 +          }
 +        }
 +      }
 +  
 +      super.basicInvalidate(event);
 +
 +      return;
 +    } finally {
 +      if (hasSeen) {
-         distributeInvalidate(event);
-         event.invokeCallbacks(this,true, false);
++    	if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++          distributeInvalidate(event);
++          event.invokeCallbacks(this,true, false);
++    	}
 +      }
 +    }
 +  }
 +
 +  @Override
 +  void basicInvalidatePart3(RegionEntry re, EntryEventImpl event,
 +      boolean invokeCallbacks) {
 +    distributeInvalidate(event);
 +    super.basicInvalidatePart3(re, event, invokeCallbacks);
 +  }
 +  
 +  void distributeInvalidate(EntryEventImpl event) {
 +    if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
 +        && !isTX() /* only distribute if non-tx */) {
 +      if (event.isDistributed() && !event.isOriginRemote()) {
 +        boolean distribute = !event.getInhibitDistribution();
 +        if (distribute) {
 +          InvalidateOperation op = new InvalidateOperation(event);
 +          op.distribute();
 +        }
 +      }
 +    }
 +  }
 +
 +  
 +  @Override
 +  void basicUpdateEntryVersion(EntryEventImpl event)
 +      throws EntryNotFoundException {
 +
 +    try {
 +      if (!hasSeenEvent(event)) {
 +        super.basicUpdateEntryVersion(event);
 +      }
 +      return;
 +    } finally {
-       distributeUpdateEntryVersion(event);
++      if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++        distributeUpdateEntryVersion(event);
++      }
 +    }
 +  }
 +
 +  private void distributeUpdateEntryVersion(EntryEventImpl event) {
 +    if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
 +        && !isTX() /* only distribute if non-tx */) {
 +      if (event.isDistributed() && !event.isOriginRemote()) {
 +        UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
 +        op.distribute();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  protected void basicClear(RegionEventImpl ev)
 +  {
 +    Lock dlock = this.getRegionDistributedLockIfGlobal();
 +    try {
 +      super.basicClear(ev);
 +    }
 +    finally {
 +      if (dlock != null)
 +        dlock.unlock();
 +    }
 +  }
 +  
 +  @Override
 +  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite)  {
 +    if (this.concurrencyChecksEnabled && !this.dataPolicy.withReplication()) {
 +      boolean retry = false;
 +      do {
 +        // non-replicate regions must defer to a replicate for clear/invalidate of region
 +        Set<InternalDistributedMember> repls = this.distAdvisor.adviseReplicates();
 +        if (repls.size() > 0) {
 +          InternalDistributedMember mbr = repls.iterator().next();
 +          RemoteRegionOperation op = RemoteRegionOperation.clear(mbr, this);
 +          try {
 +            op.distribute();
 +            return;
 +          } catch (CancelException e) {
 +            this.stopper.checkCancelInProgress(e);
 +            retry = true;
 +          } catch (RemoteOperationException e) {
 +            this.stopper.checkCancelInProgress(e);
 +            retry = true;
 +          }
 +        }
 +      } while (retry);
 +    }
 +    // if no version vector or if no replicates are around, use the default mechanism
 +    super.basicClear(regionEvent, cacheWrite);
 +  }
 +  
 +  
 +  @Override
 +  void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
 +    boolean enableRVV = useRVV && this.dataPolicy.withReplication() && this.concurrencyChecksEnabled && !getDistributionManager().isLoner(); 
 +    
 +    //Fix for 46338 - apparently multiple threads from the same VM are allowed
 +    //to suspend locking, which is what distributedLockForClear() does. We don't
 +    //want that to happen, so we'll synchronize to make sure only one thread on
 +    //this member performs a clear.
 +    synchronized(clearLock) {
 +      if (enableRVV) {
 +
 +        distributedLockForClear();
 +        try {
 +          Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
 +          // pause all generation of versions and flush from the other members to this one
 +          try {
 +            obtainWriteLocksForClear(regionEvent, participants);
 +            clearRegionLocally(regionEvent, cacheWrite, null);
 +            if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
 +              DistributedClearOperation.clear(regionEvent, null, participants);
 +            }
 +          } finally {
 +            releaseWriteLocksForClear(regionEvent, participants);
 +          }
 +        }
 +        finally {
 +          distributedUnlockForClear();
 +        }
 +      } else {
 +        Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
 +        clearRegionLocally(regionEvent, cacheWrite, null);
 +        if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
 +          DistributedClearOperation.clear(regionEvent, null, participants);
 +        }
 +      }
 +    }
 +    
 +    // since clients do not maintain RVVs except for tombstone GC
 +    // we need to ensure that current ops

<TRUNCATED>