You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:36 UTC

[74/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 0000000,f3e730a..021eba6
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@@ -1,0 -1,4311 +1,4311 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache;
+ 
+ import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.CopyOnWriteArraySet;
+ import java.util.concurrent.RejectedExecutionException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.locks.Condition;
+ import java.util.concurrent.locks.Lock;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.InvalidDeltaException;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.CacheListener;
+ import com.gemstone.gemfire.cache.CacheLoader;
+ import com.gemstone.gemfire.cache.CacheLoaderException;
+ import com.gemstone.gemfire.cache.CacheWriter;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.DataPolicy;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.LossAction;
+ import com.gemstone.gemfire.cache.MembershipAttributes;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.RegionAccessException;
+ import com.gemstone.gemfire.cache.RegionAttributes;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.RegionDistributionException;
+ import com.gemstone.gemfire.cache.RegionMembershipListener;
+ import com.gemstone.gemfire.cache.ResumptionAction;
+ import com.gemstone.gemfire.cache.RoleException;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.TransactionId;
+ import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+ import com.gemstone.gemfire.cache.execute.Function;
+ import com.gemstone.gemfire.cache.execute.FunctionException;
+ import com.gemstone.gemfire.cache.execute.ResultCollector;
+ import com.gemstone.gemfire.cache.execute.ResultSender;
+ import com.gemstone.gemfire.cache.persistence.PersistentReplicatesOfflineException;
+ import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+ import com.gemstone.gemfire.cache.wan.GatewaySender;
+ import com.gemstone.gemfire.distributed.DistributedLockService;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
+ import com.gemstone.gemfire.distributed.Role;
+ import com.gemstone.gemfire.distributed.internal.DM;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.ProfileVisitor;
+ import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+ import com.gemstone.gemfire.distributed.internal.MembershipListener;
+ import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+ import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
+ import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
+ import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
+ import com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+ import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+ import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
+ import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultWaiter;
+ import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
+ import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+ import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
+ import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+ import com.gemstone.gemfire.internal.cache.persistence.CreatePersistentRegionProcessor;
+ import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisor;
+ import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisorImpl;
+ import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
+ import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberManager;
+ import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberView;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+ import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
+ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
+ import com.gemstone.gemfire.i18n.StringId;
+ /**
+  * 
+  * @author Eric Zoerner
+  * @author Sudhir Menon
+  */
+ @SuppressWarnings("deprecation")
+ public class DistributedRegion extends LocalRegion implements 
+     CacheDistributionAdvisee
+ {
+   private static final Logger logger = LogService.getLogger();
+   
+   /** causes cache profile to be added to afterRemoteRegionCreate notification for testing */
+   public static boolean TEST_HOOK_ADD_PROFILE = false;
+   
+   /** Used to sync accesses to this.dlockService to allow lazy construction */
+   private final Object dlockMonitor = new Object();
+ 
+   final CacheDistributionAdvisor distAdvisor;
+ 
+   /**
+    * @guarded.By {@link #dlockMonitor}
+    */
+   private DistributedLockService dlockService;
+ 
+   protected final AdvisorListener advisorListener = new AdvisorListener();
+ 
+   /** Set of currently missing required roles */
+   protected final HashSet missingRequiredRoles = new HashSet();
+ 
+   /** True if this region is currently missing any required roles */
+   protected volatile boolean isMissingRequiredRoles = false;
+   
+   /**
+    * True if this region is has any required roles defined and the LossAction is
+    * either NO_ACCESS or LIMITED_ACCESS. Reliability checks will only happen if
+    * this is true.
+    */
+   private final boolean requiresReliabilityCheck;
+ 
+   /**
+    * Provides a queue for reliable message delivery
+    * 
+    * @since 5.0
+    */
+   protected final ReliableMessageQueue rmq;
+ 
+   /**
+    * Latch that is opened after initialization waits for required roles up to
+    * the <a href="DistributedSystem#member-timeout">member-timeout </a>.
+    */
+   protected final StoppableCountDownLatch initializationLatchAfterMemberTimeout; 
+ 
+   private final PersistenceAdvisor persistenceAdvisor;
+   
+   private final PersistentMemberID persistentId;
+ 
+   /**
+    * This boolean is set to false when this region
+    * is non-persistent, but there are persistent members in the distributed system
+    * to which all region modifications should be forwarded
+    * see bug 45186
+    */
+   private volatile boolean generateVersionTag = true;
+ 
+   /** Tests can set this to true and ignore reliability triggered reconnects */
+   public static boolean ignoreReconnect = false;
+   
+   /**
+    * Lock to prevent multiple threads on this member from performing
+    * a clear at the same time.
+    */
+   private final Object clearLock = new Object();
+ 
+   private static AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
+ 
+   /** Creates a new instance of DistributedRegion */
+   protected DistributedRegion(String regionName, RegionAttributes attrs,
+       LocalRegion parentRegion, GemFireCacheImpl cache,
+       InternalRegionArguments internalRegionArgs) {
+     super(regionName, attrs, parentRegion, cache, internalRegionArgs);
+     this.initializationLatchAfterMemberTimeout = new StoppableCountDownLatch(
+         getCancelCriterion(), 1);
+     this.distAdvisor = createDistributionAdvisor(internalRegionArgs);
+     
+     if (getDistributionManager().getConfig().getEnableNetworkPartitionDetection()
+         && !isInternalRegion() && !attrs.getScope().isAck() && !doesNotDistribute() && attrs.getDataPolicy().withStorage()) {
+       logger.warn(LocalizedMessage.create(LocalizedStrings.DistributedRegion_REGION_0_1_SPLITBRAIN_CONFIG_WARNING,
+           new Object[] { regionName, attrs.getScope() })); 
+     }
+     if (!getDistributionManager().getConfig().getEnableNetworkPartitionDetection() 
+         && attrs.getDataPolicy().withPersistence() && !loggedNetworkPartitionWarning.getAndSet(true)) {
+       logger.warn(LocalizedMessage.create(
+           LocalizedStrings.DistributedRegion_REGION_0_ENABLE_NETWORK_PARTITION_WARNING,
+           new Object[] { regionName, attrs.getScope() }));
+     }
+ 
+     boolean setRequiresReliabilityCheck = attrs.getMembershipAttributes()
+         .hasRequiredRoles()
+         &&
+         // note that the following includes NO_ACCESS, LIMITED_ACCESS,
+         !attrs.getMembershipAttributes().getLossAction().isAllAccess()
+         && !attrs.getMembershipAttributes().getLossAction().isReconnect();
+ 
+     // this optimization is safe for as long as Roles and Required Roles are
+     // immutable
+     // if this VM fulfills all required roles, make requiresReliabilityCheck
+     // false
+     Set reqRoles = new HashSet(attrs.getMembershipAttributes()
+         .getRequiredRoles());
+     reqRoles.removeAll(getSystem().getDistributedMember().getRoles());
+     if (reqRoles.isEmpty()) {
+       setRequiresReliabilityCheck = false;
+     }
+ 
+     this.requiresReliabilityCheck = setRequiresReliabilityCheck;
+ 
+     {
+       ReliableMessageQueue tmp = null;
+       if (this.requiresReliabilityCheck) {
+         //         if
+         // (attrs.getMembershipAttributes().getLossAction().isAllAccessWithQueuing())
+         // {
+         //           tmp = cache.getReliableMessageQueueFactory().create(this);
+         //         }
+       }
+       this.rmq = tmp;
+     }
+ 
+     if(internalRegionArgs.isUsedForPartitionedRegionBucket()) {
+       this.persistenceAdvisor = internalRegionArgs.getPersistenceAdvisor();
+     } else if (this.allowsPersistence()){
+       //TODO prpersist - using this lock service is a hack. Maybe? Or maybe
+       //it's ok if we have one (rarely used) lock service for many operations?
+       //What does the resource manager do?
+       DistributedLockService dl = cache.getPartitionedRegionLockService();
+       try {
+         //TODO prpersist - this is just a quick and dirty storage mechanism so that
+         //I can test the storage.
+         DiskRegionStats diskStats;
+         PersistentMemberView storage;
+         if(getDataPolicy().withPersistence()) {
+           storage = getDiskRegion();
+           diskStats = getDiskRegion().getStats();
+         } else {
+           storage = new InMemoryPersistentMemberView();
+           diskStats = null;
+         }
+         PersistentMemberManager memberManager = cache.getPersistentMemberManager();
+         this.persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage, this.getFullPath(), diskStats, memberManager);
+       } catch (Exception e) {
+         throw new InternalGemFireError("Couldn't recover persistence");
+       }
+     } else {
+       this.persistenceAdvisor = null;
+     }
+     if(this.persistenceAdvisor != null) {
+       this.persistentId = persistenceAdvisor.generatePersistentID();
+     } else {
+       this.persistentId = null;
+     }
+     
+   }
+   
+   @Override
+   public void createEventTracker() {
+     this.eventTracker = new EventTracker(this);
+     this.eventTracker.start();
+   }
+   
+   /**
+    * Intended for used during construction of a DistributedRegion
+    *  
+    * @return the advisor to be used by the region
+    */
+   protected CacheDistributionAdvisor createDistributionAdvisor(InternalRegionArguments internalRegionArgs) {
+     return CacheDistributionAdvisor.createCacheDistributionAdvisor(this);  // Warning: potential early escape of object before full construction
+   }
+   
+   /**
+    * Does this region support persistence?
+    */
+   public boolean allowsPersistence() {
+     return true;
+   }
+ 
+   @Override
+   public boolean requiresOneHopForMissingEntry(EntryEventImpl event) {
+     // received from another member - don't use one-hop
+     if (event.isOriginRemote()) {
+       return false; 
+     }
+     // local ops aren't distributed
+     if (event.getOperation().isLocal()) {
+       return false;
+     }
+     // if it already has a valid version tag it can go out with a DistributedCacheOperation
+     if (event.getVersionTag() != null && event.getVersionTag().getRegionVersion() > 0) {
+       return false;
+     }
+     // if we're not allowed to generate a version tag we need to send it to someone who can
+     if (!this.generateVersionTag) {
+       return true;
+     }
+     return this.concurrencyChecksEnabled &&
+         (this.srp == null) &&
+         !isTX() &&
+         this.scope.isDistributed() &&
+         !this.dataPolicy.withReplication();
+   }
+   
+   
+   /**
+    * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object, 
+    * boolean, long, boolean)
+    */
+   @Override
+   protected
+   boolean virtualPut(EntryEventImpl event,
+                      boolean ifNew,
+                      boolean ifOld,
+                      Object expectedOldValue,
+                      boolean requireOldValue,
+                      long lastModified,
+                      boolean overwriteDestroyed)
+   throws TimeoutException,
+          CacheWriterException {
+     final boolean isTraceEnabled = logger.isTraceEnabled();
+     
+     Lock dlock = null;
+     if (this.scope.isGlobal() && // lock only applies to global scope
+         !event.isOriginRemote() && // only if operation originating locally
+         !event.isNetSearch() && // search and load processor handles own locking
+         !event.isNetLoad() &&
+         // @todo darrel/kirk: what about putAll?
+         !event.isLocalLoad() &&
+         !event.isSingleHopPutOp()) { // Single Hop Op means dlock is already taken at origin node.
+       dlock = this.getDistributedLockIfGlobal(event.getKey());
+     }
+     if (isTraceEnabled) {
+       logger.trace("virtualPut invoked for event {}", event);
+     }
+     try {
+       if (!hasSeenEvent(event)) {
+         if (this.requiresOneHopForMissingEntry(event)) {
+           // bug #45704: see if a one-hop must be done for this operation
+           RegionEntry re = getRegionEntry(event.getKey());
+           if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
+             if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
+               // putAll will send a single one-hop for empty regions.  for other missing entries
+               // we need to get a valid version number before modifying the local cache 
+               boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
+                   false, false, expectedOldValue, requireOldValue, !this.generateVersionTag);
+ 
+               if (!didDistribute && isTraceEnabled) {
+                 logger.trace("Unable to perform one-hop messaging");
+               }
+               if (!this.generateVersionTag && !didDistribute) {
+                 throw new PersistentReplicatesOfflineException();
+               }
+               if (didDistribute) {
+                 if (isTraceEnabled) {
+                   logger.trace("Event after remotePut operation: {}", event);
+                 }
+                 if (event.getVersionTag() == null) {
+                   // if the event wasn't applied by the one-hop replicate it will not have a version tag
+                   // and so should not be applied to this cache
+                   return false;
+                 }
+               }
+             }
+           }
+         }
+         return super.virtualPut(event,
+                                 ifNew,
+                                 ifOld,
+                                 expectedOldValue,
+                                 requireOldValue,
+                                 lastModified,
+                                 overwriteDestroyed);
+       }
+       else {
+         if (event.getDeltaBytes() != null && event.getRawNewValue() == null) {
+           // This means that this event has delta bytes but no full value.
+           // Request the full value of this event.
+           // The value in this vm may not be same as this event's value.
+           throw new InvalidDeltaException(
+               "Cache encountered replay of event containing delta bytes for key "
+                   + event.getKey());
+         }
+         // if the listeners have already seen this event, then it has already
+         // been successfully applied to the cache.  Distributed messages and
+         // return
+         if (isTraceEnabled) {
+           logger.trace("DR.virtualPut: this cache has already seen this event {}", event);
+         }
+         
+         // Gester, Fix 39014: when hasSeenEvent, put will still distribute
+         // event, but putAll did not. We add the logic back here, not to put 
+         // back into DR.distributeUpdate() because we moved this part up into 
+         // LR.basicPutPart3 in purpose. Reviewed by Bruce.  
+         if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
+           event.getPutAllOperation().addEntry(event, true);
+         }
+ 
+         /* doing this so that other VMs will apply this no matter what. If it 
+          * is an "update" they will not apply it if they don't have the key.
+          * Because this is probably a retry, it will never get applied to this 
+          * local AbstractRegionMap, and so will never be flipped to a 'create'
+          */
+         event.makeCreate();
+         if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+           distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
+           event.invokeCallbacks(this,true, true);
+         }
+         return true;
+       }
+     }
+     finally {
+       if (dlock != null) {
+         dlock.unlock();
+       }
+     }
+   }
+   
+   @Override
+   protected RegionEntry basicPutEntry(EntryEventImpl event, long lastModified)
+       throws TimeoutException, CacheWriterException {
+     
+     final boolean isTraceEnabled = logger.isTraceEnabled();
+     
+     if (isTraceEnabled) {
+       logger.trace("basicPutEntry invoked for event {}", event);
+     }
+     if (this.requiresOneHopForMissingEntry(event)) {
+       // bug #45704: see if a one-hop must be done for this operation
+       RegionEntry re = getRegionEntry(event.getKey());
+       if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
+         final boolean ifNew = false;
+         final boolean ifOld = false;
+         boolean didDistribute = RemotePutMessage.distribute(event, lastModified,
+             ifNew, ifOld, null, false, !this.generateVersionTag);
+         if (!this.generateVersionTag && !didDistribute) {
+           throw new PersistentReplicatesOfflineException();
+         }
+         if (didDistribute && isTraceEnabled) {
+           logger.trace("Event after remotePut for basicPutEntry: {}", event);
+         }
+       }
+     }
+     return super.basicPutEntry(event, lastModified);
+   }
+ 
+   @Override
+   public void performPutAllEntry(EntryEventImpl event) {
+ 	  /*
+ 	   * force shared data view so that we just do the virtual op, accruing things in the put all operation for later
+ 	   */
+ 	if(isTX()) {
+ 		event.getPutAllOperation().addEntry(event);
+ 	} else {
+ 		getSharedDataView().putEntry(event, false, false, null, false, 0L, false);
+ 	}
+   }
+   
+   @Override
+   public void performRemoveAllEntry(EntryEventImpl event) {
+     // force shared data view so that we just do the virtual op, accruing things in the bulk operation for later
+     if(isTX()) {
+       event.getRemoveAllOperation().addEntry(event);
+     } else {
+       basicDestroy(event, true, null);
+       //getSharedDataView().destroyExistingEntry(event, true, null);
+     }
+   }
+   
+   /**
+    * distribution and listener notification
+    */
+   @Override
+   public void basicPutPart3(EntryEventImpl event, RegionEntry entry,
+       boolean isInitialized, long lastModified, boolean invokeCallbacks,
+       boolean ifNew, boolean ifOld, Object expectedOldValue,
+       boolean requireOldValue) {
+     
+     distributeUpdate(event, lastModified, false, false, null, false);
+     super.basicPutPart3(event, entry, isInitialized, lastModified,
+         invokeCallbacks, ifNew, ifOld, expectedOldValue, requireOldValue);
+   }
+ 
+   /** distribute an update operation */
+   protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
+     // an update from a netSearch is not distributed
+     if (!event.isOriginRemote() && !event.isNetSearch() && !event.isBulkOpInProgress()) {
+       boolean distribute = true;
+         if (event.getInhibitDistribution()) {
+           // this has already been distributed by a one-hop operation
+           distribute = false;
+         }
+         if (distribute) {
+           UpdateOperation op = new UpdateOperation(event, lastModified);
+           if (logger.isTraceEnabled()) {
+             logger.trace("distributing operation for event : {} : for region : {}", event, this.getName());
+           }
+           op.distribute();
+         }
+     }
+   }
+ 
+   protected void setGeneratedVersionTag(boolean generateVersionTag) {
+     // there is at-least one other persistent member, so turn on concurrencyChecks
+     enableConcurrencyChecks();
+     
+     this.generateVersionTag = generateVersionTag;
+   }
+ 
+   protected boolean getGenerateVersionTag() {
+     return this.generateVersionTag;
+   }
+ 
+   @Override
+   protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
+     if (logger.isTraceEnabled()) {
+       logger.trace("shouldGenerateVersionTag this.generateVersionTag={} ccenabled={} dataPolicy={} event:{}",
+           this.generateVersionTag, this.concurrencyChecksEnabled, this.dataPolicy, event);
+     }
+     if (!this.concurrencyChecksEnabled || this.dataPolicy == DataPolicy.EMPTY || !this.generateVersionTag) {
+       return false;
+     }
+     if (this.srp != null) { // client
+       return false;
+     }
+     if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
+       return false;
+     }
+     if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag
+       return false;
+     }
+     if (!event.isOriginRemote() && this.dataPolicy.withReplication()) {
+       return true;
+     }
+     if (!this.dataPolicy.withReplication() && !this.dataPolicy.withPersistence()) {
+       if (!entry.getVersionStamp().hasValidVersion()) {
+         // do not generate a version stamp in a region that has no replication if it's not based
+         // on an existing version from a replicate region
+         return false;
+       }
+       return true;
+     }
+     if (!event.isOriginRemote() && event.getDistributedMember() != null) {
+       if (!event.getDistributedMember().equals(this.getMyId())) {
+         return event.getVersionTag() == null; // one-hop remote message
+       }
+     }
+     return false;
+   }
+   /**
+    * Throws RegionAccessException if required roles are missing and the
+    * LossAction is NO_ACCESS
+    * 
+    * @throws RegionAccessException
+    *           if required roles are missing and the LossAction is NO_ACCESS
+    */
+   @Override
+   protected void checkForNoAccess()
+   {
+     if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
+       if (getMembershipAttributes().getLossAction().isNoAccess()) {
+         synchronized (this.missingRequiredRoles) {
+           if (!this.isMissingRequiredRoles)
+             return;
+           Set roles = Collections.unmodifiableSet(new HashSet(
+               this.missingRequiredRoles));
+           throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1.toLocalizedString(new Object[] {getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
+         }
+       }
+     }
+   }
+ 
+   /**
+    * Throws RegionAccessException is required roles are missing and the
+    * LossAction is either NO_ACCESS or LIMITED_ACCESS.
+    * 
+    * @throws RegionAccessException
+    *           if required roles are missing and the LossAction is either
+    *           NO_ACCESS or LIMITED_ACCESS
+    */
+   @Override
+   protected void checkForLimitedOrNoAccess()
+   {
+     if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
+       if (getMembershipAttributes().getLossAction().isNoAccess()
+           || getMembershipAttributes().getLossAction().isLimitedAccess()) {
+         synchronized (this.missingRequiredRoles) {
+           if (!this.isMissingRequiredRoles)
+             return;
+           Set roles = Collections.unmodifiableSet(new HashSet(
+               this.missingRequiredRoles));
+           Assert.assertTrue(!roles.isEmpty());
+           throw new RegionAccessException(LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1
+               .toLocalizedString(new Object[] { getMembershipAttributes().getLossAction(), roles}), getFullPath(), roles);
+         }
+       }
+     }
+   }
+   
+   @Override
+   protected void handleReliableDistribution(ReliableDistributionData data,
+       Set successfulRecipients) {
+     handleReliableDistribution(data, successfulRecipients,
+         Collections.EMPTY_SET, Collections.EMPTY_SET);
+   }
+ 
+   protected void handleReliableDistribution(ReliableDistributionData data,
+       Set successfulRecipients, Set otherRecipients1, Set otherRecipients2)
+   {
+     if (this.requiresReliabilityCheck) {
+       MembershipAttributes ra = getMembershipAttributes();
+       Set recipients = successfulRecipients;
+       // determine the successful roles
+       Set roles = new HashSet();
+       for (Iterator iter = recipients.iterator(); iter.hasNext();) {
+         InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
+         if (mbr != null) {
+           roles.addAll(mbr.getRoles());
+         }
+       }
+       for (Iterator iter = otherRecipients1.iterator(); iter.hasNext();) {
+         InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
+         if (mbr != null) {
+           roles.addAll(mbr.getRoles());
+         }
+       }
+       for (Iterator iter = otherRecipients2.iterator(); iter.hasNext();) {
+         InternalDistributedMember mbr = (InternalDistributedMember)iter.next();
+         if (mbr != null) {
+           roles.addAll(mbr.getRoles());
+         }
+       }
+       // determine the missing roles
+       Set failedRoles = new HashSet(ra.getRequiredRoles());
+       failedRoles.removeAll(roles);
+       if (failedRoles.isEmpty())
+         return;
+ //       if (rp.isAllAccessWithQueuing()) {
+ //         this.rmq.add(data, failedRoles);
+ //       } else {
+ 
+       throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_MAY_HAVE_FAILED_TO_NOTIFY_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
+ //       }
+     }
+   }
+ 
+   /**
+    * 
+    * Called when we do a distributed operation and don't have anyone to
+    * distributed it too. Since this is only called when no distribution was done
+    * (i.e. no recipients) we do not check isMissingRequiredRoles because it
+    * might not longer be true due to race conditions
+    * 
+    * @return false if this region has at least one required role and queuing is
+    *         configured. Returns true if sending to no one is ok.
+    * @throws RoleException
+    *           if a required role is missing and the LossAction is either
+    *           NO_ACCESS or LIMITED_ACCESS.
+    * @since 5.0
+    */
+   protected boolean isNoDistributionOk()
+   {
+     if (this.requiresReliabilityCheck) {
+       MembershipAttributes ra = getMembershipAttributes();
+       //       if (ra.getLossAction().isAllAccessWithQueuing()) {
+       //         return !ra.hasRequiredRoles();
+       //       } else {
+       Set failedRoles = ra.getRequiredRoles();
+       throw new RegionDistributionException(LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_WAS_NOT_DONE_TO_THESE_REQUIRED_ROLES_0.toLocalizedString(failedRoles), getFullPath(), failedRoles);
+       //       }
+     }
+     return true;
+   }
+   
+   /**
+    * returns true if this Region does not distribute its operations to other
+    * members.
+    * @since 6.0
+    * @see HARegion#localDestroyNoCallbacks(Object)
+    */
+   public boolean doesNotDistribute() {
+     return false;
+   }
+ 
+   
+   @Override
+   public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
+     return !doesNotDistribute() && super.shouldSyncForCrashedMember(id);
+   }
+   
+   
+   /**
+    * Adjust the specified set of recipients by removing any of them that are
+    * currently having their data queued.
+    * 
+    * @param recipients
+    *          the set of recipients that a message is to be distributed too.
+    *          Recipients that are currently having their data queued will be
+    *          removed from this set.
+    * @return the set, possibly null, of recipients that are currently having
+    *         their data queued.
+    * @since 5.0
+    */
+   protected Set adjustForQueuing(Set recipients)
+   {
+     Set result = null;
+     //     if (this.requiresReliabilityCheck) {
+     //       MembershipAttributes ra = getMembershipAttributes();
+     //       if (ra.getLossAction().isAllAccessWithQueuing()) {
+     //         Set currentQueuedRoles = this.rmq.getQueuingRoles();
+     //         if (currentQueuedRoles != null) {
+     //           // foreach recipient see if any of his roles are queued and if
+     //           // they are remove him from recipients and add him to result
+     //           Iterator it = recipients.iterator();
+     //           while (it.hasNext()) {
+     //             DistributedMember dm = (DistributedMember)it.next();
+     //             Set dmRoles = dm.getRoles();
+     //             if (!dmRoles.isEmpty()) {
+     //               if (intersects(dmRoles, currentQueuedRoles)) {
+     //                 it.remove(); // fix for bug 34447
+     //                 if (result == null) {
+     //                   result = new HashSet();
+     //                 }
+     //                 result.add(dm);
+     //               }
+     //             }
+     //           }
+     //         }
+     //       }
+     //     }
+     return result;
+   }
+ 
+   /**
+    * Returns true if the two sets intersect
+    * 
+    * @param a
+    *          a non-null non-empty set
+    * @param b
+    *          a non-null non-empty set
+    * @return true if sets a and b intersect; false if not
+    * @since 5.0
+    */
+   public static boolean intersects(Set a, Set b)
+   {
+     Iterator it;
+     Set target;
+     if (a.size() <= b.size()) {
+       it = a.iterator();
+       target = b;
+     }
+     else {
+       it = b.iterator();
+       target = a;
+     }
+     while (it.hasNext()) {
+       if (target.contains(it.next()))
+         return true;
+     }
+     return false;
+   }
+ 
+   @Override
+   public boolean requiresReliabilityCheck()
+   {
+     return this.requiresReliabilityCheck;
+   }
+ 
+   /**
+    * Returns true if the ExpiryTask is currently allowed to expire.
+    * <p>
+    * If the region is in NO_ACCESS due to reliability configuration, then no
+    * expiration actions are allowed.
+    * <p>
+    * If the region is in LIMITED_ACCESS due to reliability configuration, then
+    * only non-distributed expiration actions are allowed.
+    */
+   @Override
+   protected boolean isExpirationAllowed(ExpiryTask expiry)
+   {
+     if (this.requiresReliabilityCheck && this.isMissingRequiredRoles) {
+       if (getMembershipAttributes().getLossAction().isNoAccess()) {
+         return false;
+       }
+       if (getMembershipAttributes().getLossAction().isLimitedAccess()
+           && expiry.isDistributedAction()) {
+         return false;
+       }
+     }
+     return true;
+   }
+ 
+   /**
+    * Performs the resumption action when reliability is resumed.
+    * 
+    * @return true if asynchronous resumption is triggered
+    */
+   protected boolean resumeReliability(InternalDistributedMember id,
+       Set newlyAcquiredRoles)
+   {
+     boolean async = false;
+     try {
+       ResumptionAction ra = getMembershipAttributes().getResumptionAction();
+       if (ra.isNone()) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("Reliability resumption for action of none");
+         }
+         resumeExpiration();
+       }
+       else if (ra.isReinitialize()) {
+         async = true;
+         asyncResumeReliability(id, newlyAcquiredRoles);
+       }
+     }
+     catch (Exception e) {
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+     }
+     return async;
+   }
+ 
+   /**
+    * Handles asynchronous ResumptionActions such as region reinitialize.
+    */
+   private void asyncResumeReliability(final InternalDistributedMember id,
+                                       final Set newlyAcquiredRoles)
+                                throws RejectedExecutionException {
+     final ResumptionAction ra = getMembershipAttributes().getResumptionAction();
+     getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+       public void run()
+       {
+         try {
+           if (ra.isReinitialize()) {
+             if (logger.isDebugEnabled()) {
+               logger.debug("Reliability resumption for action of reinitialize");
+             }
+             if (!isDestroyed() && !cache.isClosed()) {
+               RegionEventImpl event = new RegionEventImpl(
+                   DistributedRegion.this, Operation.REGION_REINITIALIZE, null,
+                   false, getMyId(), generateEventID());
+               reinitialize(null, event);
+             }
+             synchronized (missingRequiredRoles) {
+               // any number of threads may be waiting on missingRequiredRoles
+               missingRequiredRoles.notifyAll();
+               if (hasListener() && id != null) {
+                 // fire afterRoleGain event
+                 RoleEventImpl relEvent = new RoleEventImpl(
+                     DistributedRegion.this, Operation.REGION_CREATE, null,
+                     true, id, newlyAcquiredRoles);
+                 dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_GAIN,
+                     relEvent);
+               }
+             }
+           }
+         }
+         catch (Exception e) {
+           logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+         }
+       }
+     });
+   }
+ 
+   /** Reschedules expiry tasks when reliability is resumed. */
+   private void resumeExpiration()
+   {
+     boolean isNoAccess = getMembershipAttributes().getLossAction().isNoAccess();
+     boolean isLimitedAccess = getMembershipAttributes().getLossAction()
+         .isLimitedAccess();
+     if (!(isNoAccess || isLimitedAccess)) {
+       return; // early out: expiration was never affected by reliability
+     }
+ 
+     if (getEntryTimeToLive().getTimeout() > 0
+         && (isNoAccess || (isLimitedAccess && getEntryTimeToLive().getAction()
+             .isDistributed()))) {
+       rescheduleEntryExpiryTasks();
+     }
+     else 
+     if (getEntryIdleTimeout().getTimeout() > 0
+         && (isNoAccess || (isLimitedAccess && getEntryIdleTimeout().getAction()
+             .isDistributed()))) {
+       rescheduleEntryExpiryTasks();
+     }
+     else
+     if (getCustomEntryTimeToLive() != null || getCustomEntryIdleTimeout() != null) {
+       // Force all entries to be rescheduled
+       rescheduleEntryExpiryTasks();
+     }
+ 
+     if (getRegionTimeToLive().getTimeout() > 0
+         && (isNoAccess || (isLimitedAccess && getRegionTimeToLive().getAction()
+             .isDistributed()))) {
+       addTTLExpiryTask();
+     }
+     if (getRegionIdleTimeout().getTimeout() > 0
+         && (isNoAccess || (isLimitedAccess && getRegionIdleTimeout()
+             .getAction().isDistributed()))) {
+       addIdleExpiryTask();
+     }
+   }
+ 
+   /**
+    * A boolean used to indicate if its the intialization time i.e the
+    * distributed Region is created for the first time. The variable is used at
+    * the time of lost reliablility.
+    */
+   private boolean isInitializingThread = false;
+ 
+   /**
+    * Called when reliability is lost. If MembershipAttributes are configured
+    * with {@link LossAction#RECONNECT}then DistributedSystem reconnect will be
+    * called asynchronously.
+    * 
+    * @return true if asynchronous resumption is triggered
+    */
+   protected boolean lostReliability(final InternalDistributedMember id,
+       final Set newlyMissingRoles)
+   {
+     if (DistributedRegion.ignoreReconnect)
+       return false;
+     boolean async = false;
+     try {
+       if (getMembershipAttributes().getLossAction().isReconnect()) {
+         async = true;
+         if (isInitializingThread) {
+           doLostReliability(true, id, newlyMissingRoles);
+         }
+         else {
+           doLostReliability(false, id, newlyMissingRoles);
+         }
+         // we don't do this in the waiting pool because we're going to
+         // disconnect
+         // the distributed system, and it will wait for the pool to empty
+         /*
+          * moved to a new method called doLostReliablity. Thread t = new
+          * Thread("Reconnect Distributed System") { public void run() { try { //
+          * TODO: may need to check isReconnecting and checkReadiness...
+          * initializationLatchAfterMemberTimeout.await(); // TODO:
+          * call reconnect here
+          * getSystem().tryReconnect((GemFireCache)getCache()); // added for
+          * reconnect. synchronized (missingRequiredRoles) { // any number of
+          * threads may be waiting on missingRequiredRoles
+          * missingRequiredRoles.notifyAll(); // need to fire an event if id is
+          * not null if (hasListener() && id != null) { RoleEventImpl relEvent =
+          * new RoleEventImpl( DistributedRegion.this, Operation.CACHE_RECONNECT,
+          * null, true, id, newlyMissingRoles); dispatchListenerEvent(
+          * EnumListenerEvent.AFTER_ROLE_LOSS, relEvent); } } } catch (Exception
+          * e) { } } };
+          * t.setDaemon(true); t.start();
+          */
+       }
+     }
+     catch (CancelException cce) {
+       throw cce;
+     }
+     catch (Exception e) {
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+     }
+     return async;
+   }
+ 
+   private void doLostReliability(boolean isInitializing,
+       final InternalDistributedMember id, final Set newlyMissingRoles)
+   {
+     try {
+       if (!isInitializing) {
+         // moved code to a new thread.
+         Thread t = new Thread(LocalizedStrings.DistributedRegion_RECONNECT_DISTRIBUTED_SYSTEM.toLocalizedString()) {
+           @Override
+           public void run()
+           {
+             try {
+               // TODO: may need to check isReconnecting and checkReadiness...
+               if (logger.isDebugEnabled()) {
+                 logger.debug("Reliability loss with policy of reconnect and membership thread doing reconnect");
+               }
+               initializationLatchAfterMemberTimeout.await();
+               getSystem().tryReconnect(false, "Role Loss", getCache());
+               synchronized (missingRequiredRoles) {
+                 // any number of threads may be waiting on missingRequiredRoles
+                 missingRequiredRoles.notifyAll();
+                 // need to fire an event if id is not null
+                 if (hasListener() && id != null) {
+                   RoleEventImpl relEvent = new RoleEventImpl(
+                       DistributedRegion.this, Operation.CACHE_RECONNECT, null,
+                       true, id, newlyMissingRoles);
+                   dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS,
+                       relEvent);
+                 }
+               }
+             }
+             catch (Exception e) {
+               logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+             }
+           }
+         };
+         t.setDaemon(true);
+         t.start();
+ 
+       }
+       else {
+         getSystem().tryReconnect(false, "Role Loss", getCache()); // added for
+         // reconnect.
+         synchronized (missingRequiredRoles) {
+           // any number of threads may be waiting on missingRequiredRoles
+           missingRequiredRoles.notifyAll();
+           // need to fire an event if id is not null
+           if (hasListener() && id != null) {
+             RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
+                 Operation.CACHE_RECONNECT, null, true, id, newlyMissingRoles);
+             dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
+           }
+         }
+         // } catch (CancelException cce){
+ 
+         // }
+ 
+       }
+     }
+     catch (CancelException ignor) {
+       throw ignor;
+     }
+     catch (Exception e) {
+       logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+     }
+ 
+   }
+ 
+   protected void lockCheckReadiness()
+   {
+     // fix for bug 32610
+     cache.getCancelCriterion().checkCancelInProgress(null);
+     checkReadiness();
+   }
+ 
+   @Override
+   public final Object validatedDestroy(Object key, EntryEventImpl event)
+       throws TimeoutException, EntryNotFoundException, CacheWriterException {
+     Lock dlock = this.getDistributedLockIfGlobal(key);
+     try {
+       return super.validatedDestroy(key, event);
+     } finally {
+       if (dlock != null) {
+         dlock.unlock();
+       }
+     }
+   }
+ 
+   /**
+    * @see LocalRegion#localDestroyNoCallbacks(Object)
+    */
+   @Override
+   public void localDestroyNoCallbacks(Object key)
+   {
+     super.localDestroyNoCallbacks(key);
+     if (getScope().isGlobal()) {
+       try {
+         this.getLockService().freeResources(key);
+       }
+       catch (LockServiceDestroyedException ignore) {
+       }
+     }
+   }
+ 
+   /**
+    * @see LocalRegion#localDestroy(Object, Object)
+    */
+   @Override
+   public void localDestroy(Object key, Object aCallbackArgument)
+       throws EntryNotFoundException
+   {
+     super.localDestroy(key, aCallbackArgument);
+     if (getScope().isGlobal()) {
+       try {
+         this.getLockService().freeResources(key);
+       }
+       catch (LockServiceDestroyedException ignore) {
+       }
+     }
+   }
+ 
+   /**
+    * @see LocalRegion#invalidate(Object, Object)
+    */
+   @Override
+   public void invalidate(Object key, Object aCallbackArgument)
+       throws TimeoutException, EntryNotFoundException
+   {
+     validateKey(key);
+     validateCallbackArg(aCallbackArgument);
+     checkReadiness();
+     checkForLimitedOrNoAccess();
+     Lock dlock = this.getDistributedLockIfGlobal(key);
+     try {
+       super.validatedInvalidate(key, aCallbackArgument);
+     }
+     finally {
+       if (dlock != null)
+         dlock.unlock();
+     }
+   }
+ 
+   @Override
+   public Lock getRegionDistributedLock() throws IllegalStateException
+   {
+     lockCheckReadiness();
+     checkForLimitedOrNoAccess();
+     if (!this.scope.isGlobal()) {
+       throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
+     }
+     return new RegionDistributedLock();
+   }
+ 
+   @Override
+   public Lock getDistributedLock(Object key) throws IllegalStateException
+   {
+     validateKey(key);
+     lockCheckReadiness();
+     checkForLimitedOrNoAccess();
+     if (!this.scope.isGlobal()) {
+       throw new IllegalStateException(LocalizedStrings.DistributedRegion_DISTRIBUTION_LOCKS_ARE_ONLY_SUPPORTED_FOR_REGIONS_WITH_GLOBAL_SCOPE_NOT_0.toLocalizedString(this.scope));
+     }
+     if (isLockingSuspendedByCurrentThread()) {
+       throw new IllegalStateException(LocalizedStrings.DistributedRegion_THIS_THREAD_HAS_SUSPENDED_ALL_LOCKING_FOR_THIS_REGION.toLocalizedString());
+     }
+     return new DistributedLock(key);
+   }
+ 
+   /**
+    * Called while NOT holding lock on parent's subregions
+    * 
+    * @throws IllegalStateException
+    *           if region is not compatible with a region in another VM.
+    * 
+    * @see LocalRegion#initialize(InputStream, InternalDistributedMember, InternalRegionArguments)
+    */
+   @Override
+   protected void initialize(InputStream snapshotInputStream,
+       InternalDistributedMember imageTarget, InternalRegionArguments internalRegionArgs) throws TimeoutException,
+       IOException, ClassNotFoundException
+   {
+     Assert.assertTrue(!isInitialized());
+     if (logger.isDebugEnabled()) {
+       logger.debug("DistributedRegion.initialize BEGIN: {}", getFullPath());
+     }
+ 
+     // if we're versioning entries we need a region-level version vector
+     if (this.scope.isDistributed() && this.concurrencyChecksEnabled) {
+       createVersionVector();
+     }
+ 
+     if (this.scope.isGlobal()) {
+       getLockService(); // create lock service eagerly now
+     }
+ 
+     final IndexUpdater indexUpdater = getIndexUpdater();
+     boolean sqlfGIILockTaken = false;
+     // this try block is to release the SQLF GII lock in finally
+     // which should be done after bucket status will be set
+     // properly in LocalRegion#initialize()
+     try {
+      try {
+       try {
+         // take the GII lock to avoid missing entries while updating the
+         // index list for SQLFabric (#41330 and others)
+         if (indexUpdater != null) {
+           indexUpdater.lockForGII();
+           sqlfGIILockTaken = true;
+         }
+         
+         PersistentMemberID persistentId = null;
+         boolean recoverFromDisk = isRecoveryNeeded();
+         DiskRegion dskRgn = getDiskRegion();
+         if (recoverFromDisk) {
+           if (logger.isDebugEnabled()) {
+             logger.debug("DistributedRegion.getInitialImageAndRecovery: Starting Recovery");
+           }
+           dskRgn.initializeOwner(this); // do recovery
+           if (logger.isDebugEnabled()) {
+             logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery");
+           }
+           persistentId = dskRgn.getMyPersistentID();
+         }
+         
+         // Create OQL indexes before starting GII.
+         createOQLIndexes(internalRegionArgs, recoverFromDisk);
+  
+         if (getDataPolicy().withReplication()
+             || getDataPolicy().withPreloaded()) {
+           getInitialImageAndRecovery(snapshotInputStream, imageTarget,
+               internalRegionArgs, recoverFromDisk, persistentId);
+         }
+         else {
+           new CreateRegionProcessor(this).initializeRegion();
+           if (snapshotInputStream != null) {
+             releaseBeforeGetInitialImageLatch();
+             loadSnapshotDuringInitialization(snapshotInputStream);
+           }
+         }
+       }
+       catch (DiskAccessException dae) {
+         this.handleDiskAccessException(dae, true);
+         throw dae;
+       }
+ 
+       initMembershipRoles();
+       isInitializingThread = false;
+       super.initialize(null, null, null); // makes sure all latches are released if they haven't been already
+      } finally {
+       if (this.eventTracker != null) {
+         this.eventTracker.setInitialized();
+       }
+      }
+     } finally {
+       if (sqlfGIILockTaken) {
+         indexUpdater.unlockForGII();
+       }
+     }
+   }
+ 
+   @Override
+   public void initialized() {
+     new UpdateAttributesProcessor(this).distribute(false);
+   }
+ 
+   /** True if GII was impacted by missing required roles */
+   private boolean giiMissingRequiredRoles = false;
+ 
+   /**
+    * A reference counter to protected the memoryThresholdReached boolean
+    */
+   private final Set<DistributedMember> memoryThresholdReachedMembers =
+     new CopyOnWriteArraySet<DistributedMember>();
+ 
+   private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+ 
+   /** Sets and returns giiMissingRequiredRoles */
+   private boolean checkInitialImageForReliability(
+       InternalDistributedMember imageTarget,
+       CacheDistributionAdvisor.InitialImageAdvice advice)
+   {
+     // assumption: required roles are interesting to GII only if Reinitialize...
+ //    if (true)
+       return false;
+ //    if (getMembershipAttributes().hasRequiredRoles()
+ //        && getMembershipAttributes().getResumptionAction().isReinitialize()) {
+ //      // are any required roles missing for GII with Reinitialize?
+ //      Set missingRR = new HashSet(getMembershipAttributes().getRequiredRoles());
+ //      missingRR.removeAll(getSystem().getDistributedMember().getRoles());
+ //      for (Iterator iter = advice.replicates.iterator(); iter.hasNext();) {
+ //        DistributedMember member = (DistributedMember)iter.next();
+ //        missingRR.removeAll(member.getRoles());
+ //      }
+ //      for (Iterator iter = advice.others.iterator(); iter.hasNext();) {
+ //        DistributedMember member = (DistributedMember)iter.next();
+ //        missingRR.removeAll(member.getRoles());
+ //      }
+ //      for (Iterator iter = advice.preloaded.iterator(); iter.hasNext();) {
+ //        DistributedMember member = (DistributedMember)iter.next();
+ //        missingRR.removeAll(member.getRoles());
+ //      }
+ //      if (!missingRR.isEmpty()) {
+ //        // entering immediate loss condition, which will cause reinit on resume
+ //        this.giiMissingRequiredRoles = true;
+ //      }
+ //    }
+ //    return this.giiMissingRequiredRoles;
+   }
+ 
+   private void getInitialImageAndRecovery(InputStream snapshotInputStream,
+       InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs,
+       boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException
+   {
+     logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_0, this.getName()));
+   
+     ImageState imgState = getImageState();
+     imgState.init();
+     boolean targetRecreated = internalRegionArgs.getRecreateFlag();
+     Boolean isCBool = (Boolean)isConversion.get();
+     boolean isForConversion = isCBool!=null?isCBool.booleanValue():false;
+ 
+     if (recoverFromDisk && snapshotInputStream != null && !isForConversion) {
+       throw new InternalGemFireError(LocalizedStrings.DistributedRegion_IF_LOADING_A_SNAPSHOT_THEN_SHOULD_NOT_BE_RECOVERING_ISRECOVERING_0_SNAPSHOTSTREAM_1.toLocalizedString(new Object[] {Boolean.valueOf(recoverFromDisk), snapshotInputStream}));
+     }
+ 
+     ProfileExchangeProcessor targetProvider;
+     if (dataPolicy.withPersistence()) {
+       targetProvider = new CreatePersistentRegionProcessor(this,
+           getPersistenceAdvisor(), recoverFromDisk);
+     }
+     else {
+       // this will go in the advisor profile
+       targetProvider = new CreateRegionProcessor(this);
+     }
+     imgState.setInRecovery(false);
+     RegionVersionVector recovered_rvv = null;
+     if (dataPolicy.withPersistence()) {
+       recovered_rvv = (this.getVersionVector()==null?null:this.getVersionVector().getCloneForTransmission());
+     }
+       // initializeRegion will send out our profile
+     targetProvider.initializeRegion();
+     
+     if(persistenceAdvisor != null) {
+       persistenceAdvisor.initialize();
+     }
+     
+     // Register listener here so that the remote members are known
+     // since registering calls initializeCriticalMembers (which needs to know about
+     // remote members
+     if (!isInternalRegion()) {
+       if (!this.isDestroyed) {
+         cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
+       }
+     }
+     
+     releaseBeforeGetInitialImageLatch();
+ 
+     // allow GII to invoke test hooks.  Do this just after releasing the
+     // before-gii latch for bug #48962.  See ConcurrentLeaveDuringGIIDUnitTest
+     InitialImageOperation.beforeGetInitialImage(this);
+     
+     if (snapshotInputStream != null) {
+       try {
+         if (logger.isDebugEnabled()) {
+           logger.debug("DistributedRegion.getInitialImageAndRecovery: About to load snapshot, isInitialized={}; {}",
+               isInitialized(), getFullPath());
+         }
+         loadSnapshotDuringInitialization(snapshotInputStream);
+       }
+       catch (IOException e) {
+         throw new RuntimeException(e); // @todo change this exception?
+       }
+       catch (ClassNotFoundException e) {
+         throw new RuntimeException(e); // @todo change this exception?
+       }
+       cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
+       return;
+     }
+     
+     // No snapshot provided, use the imageTarget(s)
+ 
+     // if we were given a recommended imageTarget, use that first, and
+     // treat it like it is a replicate (regardless of whether it actually is
+     // or not)
+ 
+     InitialImageOperation iiop = new InitialImageOperation(this, this.entries);
+     // [defunct] Special case GII for PR admin regions (which are always
+     // replicates and always writers
+     // bruce: this was commented out after adding the GIIAckRequest logic to
+     // force
+     //        consistency before the gii operation begins
+     //      if (isUsedForPartitionedRegionAdmin() ||
+     // isUsedForPartitionedRegionBucket()) {
+     //        releaseBeforeGetInitialImageLatch();
+     //        iiop.getFromAll(this.distAdvisor.adviseGeneric(), false);
+     //        cleanUpDestroyedTokens();
+     //        return;
+     //      }
+ 
+ 
+     CacheDistributionAdvisor.InitialImageAdvice advice = null;
+     boolean done = false;
+     while(!done && !isDestroyed()) {
+       advice = targetProvider.getInitialImageAdvice(advice);
+       checkInitialImageForReliability(imageSrc, advice);
+       boolean attemptGetFromOne = 
+         imageSrc != null // we were given a specific member
+         || this.dataPolicy.withPreloaded()
+            && !advice.preloaded.isEmpty() // this is a preloaded region
+         || (!advice.replicates.isEmpty());
+       // That is: if we have 0 or 1 giiProvider then we can do a getFromOne gii;
+       // if we have 2 or more giiProviders then we must do a getFromAll gii.
+ 
+       if (attemptGetFromOne) {
+         if (recoverFromDisk) {
+           if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
+             CacheObserverHolder.getInstance().afterMarkingGIIStarted();
+           }
+         }
+         {
+           // If we have an imageSrc and the target is reinitializing mark the
+           // getInitialImage so that it will wait until the target region is fully initialized
+           // before responding to the get image request. Otherwise, the
+           // source may respond with no data because it is still initializing,
+           // e.g. loading a snapshot.
+ 
+           // Plan A: use specified imageSrc, if specified
+           if (imageSrc != null) {
+             try {
+               GIIStatus ret = iiop.getFromOne(Collections.singleton(imageSrc),
+                   targetRecreated, advice, recoverFromDisk, recovered_rvv);
+               if (GIIStatus.didGII(ret)) {
+                 this.giiMissingRequiredRoles = false;
+                 cleanUpDestroyedTokensAndMarkGIIComplete(ret);
+                 done = true;
+                 return;
+               }
+             } finally {
+               imageSrc = null;
+             }
+           }
+ 
+           // Plan C: use a replicate, if one exists
+           GIIStatus ret = iiop.getFromOne(advice.replicates, false, advice, recoverFromDisk, recovered_rvv);
+           if (GIIStatus.didGII(ret)) {
+             cleanUpDestroyedTokensAndMarkGIIComplete(ret);
+             done = true;
+             return;
+           }
+ 
+           // Plan D: if this is a PRELOADED region, fetch from another PRELOADED
+           if (this.dataPolicy.isPreloaded()) {
+             GIIStatus ret_preload = iiop.getFromOne(advice.preloaded, false, advice, recoverFromDisk, recovered_rvv);
+             if (GIIStatus.didGII(ret_preload)) {
+               cleanUpDestroyedTokensAndMarkGIIComplete(ret_preload);
+               done = true;
+               return;
+             }
+           } // isPreloaded
+         }
+ 
+         //If we got to this point, we failed in the GII. Cleanup
+         //any partial image we received
+         cleanUpAfterFailedGII(recoverFromDisk);
+ 
+       } // attemptGetFromOne
+       else {
+         if(!isDestroyed()) {
+           if(recoverFromDisk) {
+             logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZED_FROM_DISK,
+                 new Object[] {this.getFullPath(), persistentId, getPersistentID()}));
+             if(persistentId != null) {
+               RegionLogger.logRecovery(this.getFullPath(), persistentId,
+                   getDistributionManager().getDistributionManagerId());
+             }
+           } else {
+             RegionLogger.logCreate(this.getFullPath(),
+                 getDistributionManager().getDistributionManagerId());
+             
+             if (getPersistentID() != null) {
+               RegionLogger.logPersistence(this.getFullPath(),
+                   getDistributionManager().getDistributionManagerId(),
+                   getPersistentID());
+               logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_NEW_PERSISTENT_REGION_CREATED,
+                   new Object[] {this.getFullPath(), getPersistentID()}));
+             }
+           }
+           
+           /* no more union GII
+             // do union getInitialImage
+             Set rest = new HashSet();
+             rest.addAll(advice.others);
+             rest.addAll(advice.preloaded);
+             // push profile w/ recovery flag turned off at same time that we
+             // do a union getInitialImage
+             boolean pushProfile = recoverFromDisk;
+             iiop.getFromAll(rest, pushProfile);
+            */
+           cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
+           done = true;
+           return;
+         }
+         break;
+       }
+     }
+ 
+     return;
+   }
+   
+   private void synchronizeWith(InternalDistributedMember target, 
+       VersionSource idToRecover) {
+     InitialImageOperation op = new InitialImageOperation(this, this.entries);
+     op.synchronizeWith(target, idToRecover, null);
+   }
+   
+   /**
+    * If this region has concurrency controls enabled this will pull any missing
+    * changes from other replicates using InitialImageOperation and a filtered
+    * chunking protocol.
+    */
+   public void synchronizeForLostMember(InternalDistributedMember
+       lostMember, VersionSource lostVersionID) {
+     if (this.concurrencyChecksEnabled == false) {
+       return;
+     }
+     CacheDistributionAdvisor advisor = getCacheDistributionAdvisor();
+     Set<InternalDistributedMember> targets = advisor.adviseInitializedReplicates();
+     for (InternalDistributedMember target: targets) {
+       synchronizeWith(target, lostVersionID, lostMember);
+     }
+   }
+   
+   /**
+    * synchronize with another member wrt messages from the given "lost" member.
+    * This can be used when a primary bucket crashes to ensure that interrupted
+    * message distribution is mended.
+    */
+   private void synchronizeWith(InternalDistributedMember target,
+       VersionSource versionMember, InternalDistributedMember lostMember) {
+     InitialImageOperation op = new InitialImageOperation(this, this.entries);
+     op.synchronizeWith(target, versionMember, lostMember);
+   }
+ 
+   /**
+    * invoked just before an initial image is requested from another member
+    */
+   /** remove any partial entries received in a failed GII */
+   protected void cleanUpAfterFailedGII(boolean recoverFromDisk) {
+     DiskRegion dskRgn = getDiskRegion();
+     //if we have a persistent region, instead of deleting everything on disk,
+     //we will just reset the "recovered from disk" flag. After
+     //the next GII we will delete these entries if they do not come
+     //in as part of the GII.
+     if (recoverFromDisk && dskRgn != null && dskRgn.isBackup()) {
+       dskRgn.resetRecoveredEntries(this);
+       return;
+     }
+ 
+     if (!this.entries.isEmpty()) {
+       closeEntries();
+       if (getDiskRegion() != null) {
+         getDiskRegion().clear(this, null);
+       }
+       // clear the left-members and version-tags sets in imageState
+       getImageState().getLeftMembers();
+       getImageState().getVersionTags();
+       // Clear OQL indexes
+       if (this.indexManager != null) {
+         try {
+           this.indexManager.rerunIndexCreationQuery();
+         } catch (Exception ex){
+           if (logger.isDebugEnabled()) {
+             logger.debug("Exception while clearing indexes after GII failure.", ex);
+           }
+         }
+       }
+     }
+   }
+ 
+   private void initMembershipRoles()
+   {
+     synchronized (this.advisorListener) {
+       // hold sync to prevent listener from changing initial members
+       Set others = this.distAdvisor
+           .addMembershipListenerAndAdviseGeneric(this.advisorListener);
+       this.advisorListener.addMembers(others);
+       // initialize missing required roles with initial member info
+       if (getMembershipAttributes().hasRequiredRoles()) {
+         // AdvisorListener will also sync on missingRequiredRoles
+         synchronized (this.missingRequiredRoles) {
+           this.missingRequiredRoles.addAll(getMembershipAttributes()
+               .getRequiredRoles());
+           // remove all the roles we are playing since they will never be
+           // missing
+           this.missingRequiredRoles.removeAll(getSystem()
+               .getDistributedMember().getRoles());
+           for (Iterator iter = others.iterator(); iter.hasNext();) {
+             DistributedMember other = (DistributedMember)iter.next();
+             this.missingRequiredRoles.removeAll(other.getRoles());
+           }
+         }
+       }
+     }
+     if (getMembershipAttributes().hasRequiredRoles()) {
+       // wait up to memberTimeout for required roles...
+ //      boolean requiredRolesAreMissing = false;
+       int memberTimeout = getSystem().getConfig().getMemberTimeout();
+       if (logger.isDebugEnabled()) {
+         logger.debug("Waiting up to {} for required roles.", memberTimeout);
+       }
+       try {
+         if (this.giiMissingRequiredRoles) {
+           // force reliability loss and possibly resumption
+           isInitializingThread = true;
+           synchronized (this.advisorListener) {
+             synchronized (this.missingRequiredRoles) {
+               // forcing state of loss because of bad GII
+               this.isMissingRequiredRoles = true;
+               getCachePerfStats().incReliableRegionsMissing(1);
+               if (getMembershipAttributes().getLossAction().isAllAccess())
+                 getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
+               else if (getMembershipAttributes().getLossAction()
+                   .isLimitedAccess())
+                 getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
+               else if (getMembershipAttributes().getLossAction().isNoAccess())
+                 getCachePerfStats().incReliableRegionsMissingNoAccess(1);
+               // pur code to increment the stats.
+               if (logger.isDebugEnabled()) {
+                 logger.debug("GetInitialImage had missing required roles.");
+               }
+               // TODO: will this work with RECONNECT and REINITIALIZE?
+               isInitializingThread = true;
+               lostReliability(null, null);
+               if (this.missingRequiredRoles.isEmpty()) {
+                 // all required roles are present so force resumption
+                 this.isMissingRequiredRoles = false;
+                 getCachePerfStats().incReliableRegionsMissing(-1);
+                 if (getMembershipAttributes().getLossAction().isAllAccess())
+                   getCachePerfStats().incReliableRegionsMissingFullAccess(-1); // rahul
+                 else if (getMembershipAttributes().getLossAction()
+                     .isLimitedAccess())
+                   getCachePerfStats()
+                       .incReliableRegionsMissingLimitedAccess(-1);
+                 else if (getMembershipAttributes().getLossAction().isNoAccess())
+                   getCachePerfStats().incReliableRegionsMissingNoAccess(-1);
+                 // pur code to increment the stats.
+                 boolean async = resumeReliability(null, null);
+                 if (async) {
+                   advisorListener.destroyed = true;
+                 }
+               }
+             }
+           }
+         }
+         else {
+           if (!getSystem().isLoner()) {
+             waitForRequiredRoles(memberTimeout);
+           }
+           synchronized (this.advisorListener) {
+             synchronized (this.missingRequiredRoles) {
+               if (this.missingRequiredRoles.isEmpty()) {
+                 Assert.assertTrue(!this.isMissingRequiredRoles);
+                 if (logger.isDebugEnabled()) {
+                   logger.debug("Initialization completed with all required roles present.");
+                 }
+               }
+               else {
+                 // starting in state of loss...
+                 this.isMissingRequiredRoles = true;
+                 getCachePerfStats().incReliableRegionsMissing(1);
+                 if (getMembershipAttributes().getLossAction().isAllAccess())
+                   getCachePerfStats().incReliableRegionsMissingFullAccess(1); // rahul
+                 else if (getMembershipAttributes().getLossAction()
+                     .isLimitedAccess())
+                   getCachePerfStats().incReliableRegionsMissingLimitedAccess(1);
+                 else if (getMembershipAttributes().getLossAction().isNoAccess())
+                   getCachePerfStats().incReliableRegionsMissingNoAccess(1);
+                 
+                 if (logger.isDebugEnabled()) {
+                   logger.debug("Initialization completed with missing required roles: {}", this.missingRequiredRoles);
+                 }
+                 isInitializingThread = true;
+                 lostReliability(null, null);
+               }
+             }
+           }
+         }
+       }
+       catch (RegionDestroyedException ignore) {
+         // ignore to fix bug 34639 may be thrown by waitForRequiredRoles
+       }
+       catch (CancelException ignore) {
+         // ignore to fix bug 34639 may be thrown by waitForRequiredRoles
+         if (isInitializingThread) {
+           throw ignore;
+         }
+       }
+       catch (Exception e) {
+         logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION), e);
+       }
+ 
+     }
+     // open latch which will allow any threads in lostReliability to proceed
+     this.initializationLatchAfterMemberTimeout.countDown();
+   }
+   private boolean isRecoveryNeeded() {
+     return getDataPolicy().withPersistence()
+       && getDiskRegion().isRecreated();
+   }
+ 
+   // called by InitialImageOperation to clean up destroyed tokens
+   // release afterGetInitialImageInitializationLatch before unlocking
+   // cleanUpLock
+   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK")
+   private void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus)
+   {
+     //We need to clean up the disk before we release the after get initial image latch
+     DiskRegion dskRgn = getDiskRegion();
+     if (dskRgn != null && dskRgn.isBackup()) {
+       dskRgn.finishInitializeOwner(this, giiStatus);
+     }
+     ImageState is = getImageState();
+     is.lockGII();
+     // clear the version tag and left-members sets
+     is.getVersionTags();
+     is.getLeftMembers();
+     // remove DESTROYED tokens
+     RegionVersionVector rvv = is.getClearRegionVersionVector();
+     try {
+       Iterator/*<Object>*/ keysIt = getImageState().getDestroyedEntries();
+       while (keysIt.hasNext()) {
+         this.entries.removeIfDestroyed(keysIt.next());
+       }
+       if (rvv != null) {
+         // clear any entries received in the GII that are older than the RVV versions.
+         // this can happen if entry chunks were received prior to the clear() being
+         // processed
+         clearEntries(rvv);
+       }
+       //need to do this before we release the afterGetInitialImageLatch
+       if(persistenceAdvisor != null) {
+         persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
+       }
+     }
+     finally {
+       // release after gii lock first so basicDestroy will see isInitialized()
+       // be true
+       // when they get the cleanUp lock.
+         try {
+           releaseAfterGetInitialImageLatch();
+         } finally { // make sure unlockGII is done for bug 40001
+           is.unlockGII();
+         }
+     }
+     
+     if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER){
+       CacheObserverHolder.getInstance().afterMarkingGIICompleted();
+     }
+     
+     //"Initializing region {0}" which is not acompanied by a completed message. Users think thread is stuck in some operation. Hence adding this log
+     logger.info(LocalizedMessage.create(LocalizedStrings.DistributedRegion_INITIALIZING_REGION_COMPLETED_0, this.getName()));
+   }
+ 
+   /**
+    * @see LocalRegion#basicDestroy(EntryEventImpl, boolean, Object)
+    */
+   @Override
+   protected
+   void basicDestroy(EntryEventImpl event,
+                        boolean cacheWrite,
+                        Object expectedOldValue)
+   throws EntryNotFoundException, CacheWriterException, TimeoutException {
+     //  disallow local destruction for mirrored keysvalues regions
+     boolean invokeWriter = cacheWrite;
+     boolean hasSeen = false;
+     if (hasSeenEvent(event)) {
+       hasSeen = true;
+     }
+     checkIfReplicatedAndLocalDestroy(event);
+     
+     try {
+       if (this.requiresOneHopForMissingEntry(event)) {
+         // bug #45704: see if a one-hop must be done for this operation
+         RegionEntry re = getRegionEntry(event.getKey());
+         if (re == null /*|| re.isTombstone()*/ || !this.generateVersionTag) {
+           if (this.srp == null) {
+             // only assert for non-client regions.
+             Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
+           }
+           if (!event.isBulkOpInProgress() || this.dataPolicy.withStorage()) {
+             // removeAll will send a single one-hop for empty regions.  for other missing entries
+             // we need to get a valid version number before modifying the local cache 
+           // TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
+           boolean didDistribute = RemoteDestroyMessage.distribute(event, expectedOldValue, !this.generateVersionTag);
+ 
+           if (!this.generateVersionTag && !didDistribute) {
+             throw new PersistentReplicatesOfflineException();
+           }
+           
+           if (didDistribute) {
+             if (logger.isTraceEnabled()) {
+               logger.trace("Event after remoteDestroy operation: {}", event);
+             }
+             invokeWriter = false; // remote cache invoked the writer
+             if (event.getVersionTag() == null) {
+               // if the event wasn't applied by the one-hop replicate it will not have a version tag
+               // and so should not be applied to this cache
+               return;
+             }
+           }
+           }
+         }
+       }
+       
+       super.basicDestroy(event, invokeWriter, expectedOldValue);
+ 
+       // if this is a destroy coming in from remote source, free up lock resources
+       // if this is a local origin destroy, this will happen after lock is
+       // released
+       if (this.scope.isGlobal() && event.isOriginRemote()) {
+         try {
+           getLockService().freeResources(event.getKey());
+         }
+         catch (LockServiceDestroyedException ignore) {
+         }
+       }
+   
+       return;
+     } 
+     finally {
+       if (hasSeen) {
+         if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
+           event.getRemoveAllOperation().addEntry(event, true);
+         }
+         if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+           distributeDestroy(event, expectedOldValue);
+           event.invokeCallbacks(this,true, false);
+         }
+       }
+     }
+   }
+ 
+   @Override
+   void basicDestroyPart3(RegionEntry re, EntryEventImpl event,
+       boolean inTokenMode, boolean duringRI, boolean invokeCallbacks, Object expectedOldValue) {
+   
+     distributeDestroy(event, expectedOldValue);
+     super.basicDestroyPart3(re, event, inTokenMode, duringRI, invokeCallbacks, expectedOldValue);
+   }
+   
+   void distributeDestroy(EntryEventImpl event, Object expectedOldValue) {
+     if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
+       boolean distribute = !event.getInhibitDistribution();
+       if (distribute) {
+         DestroyOperation op =  new DestroyOperation(event);
+         op.distribute();
+       }
+     }
+   }
+   
+   @Override
+   boolean evictDestroy(LRUEntry entry) {
+     boolean evictDestroyWasDone = super.evictDestroy(entry);
+     if (evictDestroyWasDone) {
+       if (this.scope.isGlobal()) {
+         try {
+           getLockService().freeResources(entry.getKey());
+         }
+         catch (LockServiceDestroyedException ignore) {
+         }
+       }
+     }
+     return evictDestroyWasDone;
+   }
+ 
+ 
+   /**
+    * @see LocalRegion#basicInvalidateRegion(RegionEventImpl)
+    */
+   @Override
+   void basicInvalidateRegion(RegionEventImpl event)
+   {
+     // disallow local invalidation for replicated regions
+     if (!event.isDistributed() && getScope().isDistributed()
+         && getDataPolicy().withReplication()) {
+       throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
+     }
+     if (shouldDistributeInvalidateRegion(event)) {
+       distributeInvalidateRegion(event);
+     }
+     super.basicInvalidateRegion(event);
+   }
+ 
+   /**
+    * decide if InvalidateRegionOperation should be sent to peers. broken out so
+    * that BucketRegion can override
+    * @param event
+    * @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise
+    */
+   protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
+     return event.isDistributed() && !event.isOriginRemote();
+   }
+ 
+   /**
+    * Distribute the invalidate of a region given its event.
+    * This implementation sends the invalidate to peers.
+    * @since 5.7
+    */
+   protected void distributeInvalidateRegion(RegionEventImpl event) {
+     new InvalidateRegionOperation(event).distribute();
+   }
+ 
+   /**
+    * @see LocalRegion#basicDestroyRegion(RegionEventImpl, boolean, boolean,
+    *      boolean)
+    */
+   @Override
+   void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite,
+       boolean lock, boolean callbackEvents) throws CacheWriterException,
+       TimeoutException
+   {
+     final String path = getFullPath();
+     //Keep track of regions that are being destroyed. This helps avoid a race
+     //when another member concurrently creates this region. See bug 42051.
+     boolean isClose = event.getOperation().isClose();
+     if(!isClose) {
+       cache.beginDestroy(path, this);
+     }
+     try {
+       super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents);
+       // send destroy region operation even if this is a localDestroyRegion (or
+       // close)
+       if (!event.isOriginRemote()) {
+         distributeDestroyRegion(event, true);
+       } else {
+         if(!event.isReinitializing()) {
+           RegionEventImpl localEvent = new RegionEventImpl(this,
+               Operation.REGION_LOCAL_DESTROY, event.getCallbackArgument(), false, getMyId(),
+               generateEventID()/* generate EventID */);
+           distributeDestroyRegion(localEvent, false/*fixes bug 41111*/);
+         }
+       }
+       notifyBridgeClients(event);
+     }
+     catch (CancelException e) {
+       if (logger.isDebugEnabled()) {
+         logger.debug("basicDestroyRegion short-circuited due to cancellation");
+       }
+     }
+     finally {
+       if(!isClose) {
+         cache.endDestroy(path, this);
+       }
+       RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose);
+     }
+  }
+ 
+ 
+   @Override
+   protected void distributeDestroyRegion(RegionEventImpl event,
+                                          boolean notifyOfRegionDeparture) {
+     if(persistenceAdvisor != null) {
+       persistenceAdvisor.releaseTieLock();
+     }
+     new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
+   }
+ 
+   /**
+    * Return true if invalidation occurred; false if it did not, for example if
+    * it was already invalidated
+    * 
+    * @see LocalRegion#basicInvalidate(EntryEventImpl)
+    */
+   @Override
+   void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException
+   {
+     
+     boolean hasSeen = false;
+     if (hasSeenEvent(event)) {
+       hasSeen = true;
+     }
+     try {
+       // disallow local invalidation for replicated regions
+       if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed()
+           && getDataPolicy().withReplication()) {
+         throw new IllegalStateException(LocalizedStrings.DistributedRegion_NOT_ALLOWED_TO_DO_A_LOCAL_INVALIDATION_ON_A_REPLICATED_REGION.toLocalizedString());
+       }
+       if (this.requiresOneHopForMissingEntry(event)) {
+         // bug #45704: see if a one-hop must be done for this operation
+         RegionEntry re = getRegionEntry(event.getKey());
+         if (re == null/* || re.isTombstone()*/ || !this.generateVersionTag) {
+           if (this.srp == null) {
+             // only assert for non-client regions.
+             Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
+           }
+           // TODO: deltaGII: verify that delegating to a peer when this region is also a client is acceptable
+           boolean didDistribute = RemoteInvalidateMessage.distribute(event, !this.generateVersionTag);
+           if (!this.generateVersionTag && !didDistribute) {
+             throw new PersistentReplicatesOfflineException();
+           }
+           if (didDistribute) {
+             if (logger.isDebugEnabled()) {
+               logger.debug("Event after remoteInvalidate operation: {}", event);
+             }
+             if (event.getVersionTag() == null) {
+               // if the event wasn't applied by the one-hop replicate it will not have a version tag
+               // and so should not be applied to this cache
+               return;
+             }
+           }
+         }
+       }
+   
+       super.basicInvalidate(event);
+ 
+       return;
+     } finally {
+       if (hasSeen) {
+     	if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+           distributeInvalidate(event);
+           event.invokeCallbacks(this,true, false);
+     	}
+       }
+     }
+   }
+ 
+   @Override
+   void basicInvalidatePart3(RegionEntry re, EntryEventImpl event,
+       boolean invokeCallbacks) {
+     distributeInvalidate(event);
+     super.basicInvalidatePart3(re, event, invokeCallbacks);
+   }
+   
+   void distributeInvalidate(EntryEventImpl event) {
+     if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
+         && !isTX() /* only distribute if non-tx */) {
+       if (event.isDistributed() && !event.isOriginRemote()) {
+         boolean distribute = !event.getInhibitDistribution();
+         if (distribute) {
+           InvalidateOperation op = new InvalidateOperation(event);
+           op.distribute();
+         }
+       }
+     }
+   }
+ 
+   
+   @Override
+   void basicUpdateEntryVersion(EntryEventImpl event)
+       throws EntryNotFoundException {
+ 
+     try {
+       if (!hasSeenEvent(event)) {
+         super.basicUpdateEntryVersion(event);
+       }
+       return;
+     } finally {
+       if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+         distributeUpdateEntryVersion(event);
+       }
+     }
+   }
+ 
+   private void distributeUpdateEntryVersion(EntryEventImpl event) {
+     if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
+         && !isTX() /* only distribute if non-tx */) {
+       if (event.isDistributed() && !event.isOriginRemote()) {
+         UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
+         op.distribute();
+       }
+     }
+   }
+ 
+   @Override
+   protected void basicClear(RegionEventImpl ev)
+   {
+     Lock dlock = this.getRegionDistributedLockIfGlobal();
+     try {
+       super.basicClear(ev);
+     }
+     finally {
+       if (dlock != null)
+         dlock.unlock();
+     }
+   }
+   
+   @Override
+   void basicClear(RegionEventImpl regionEvent, boolean cacheWrite)  {
+     if (this.concurrencyChecksEnabled && !this.dataPolicy.withReplication()) {
+       boolean retry = false;
+       do {
+         // non-replicate regions must defer to a replicate for clear/invalidate of region
+         Set<InternalDistributedMember> repls = this.distAdvisor.adviseReplicates();
+         if (repls.size() > 0) {
+           InternalDistributedMember mbr = repls.iterator().next();
+           RemoteRegionOperation op = RemoteRegionOperation.clear(mbr, this);
+           try {
+             op.distribute();
+             return;
+           } catch (CancelException e) {
+             this.stopper.checkCancelInProgress(e);
+             retry = true;
+           } catch (RemoteOperationException e) {
+             this.stopper.checkCancelInProgress(e);
+             retry = true;
+           }
+         }
+       } while (retry);
+     }
+     // if no version vector or if no replicates are around, use the default mechanism
+     super.basicClear(regionEvent, cacheWrite);
+   }
+   
+   
+   @Override
+   void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+     boolean enableRVV = useRVV && this.dataPolicy.withReplication() && this.concurrencyChecksEnabled && !getDistributionManager().isLoner(); 
+     
+     //Fix for 46338 - apparently multiple threads from the same VM are allowed
+     //to suspend locking, which is what distributedLockForClear() does. We don't
+     //want that to happen, so we'll synchronize to make sure only one thread on
+     //this member performs a clear.
+     synchronized(clearLock) {
+       if (enableRVV) {
+ 
+         distributedLockForClear();
+         try {
+           Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
+           // pause all generation of versions and flush from the other members to this one
+           try {
+             obtainWriteLocksForClear(regionEvent, participants);
+             clearRegionLocally(regionEvent, cacheWrite, null);
+             if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
+               DistributedClearOperation.clear(regionEvent, null, participants);
+             }
+           } finally {
+             releaseWriteLocksForClear(regionEvent, participants);
+           }
+         }
+         finally {
+           distributedUnlockForClear();
+         }
+       } else {
+         Set<InternalDistributedMember> participants = getCacheDistributionAdvisor().adviseInvalidateRegion();
+         clearRegionLocally(regionEvent, cacheWrite, null);
+         if (!regionEvent.isOriginRemote() && regionEvent.isDistributed()) {
+           DistributedClearOperation.clear(regionEvent, null, participants);
+         }
+       }
+     }
+     
+     // since clients do not maintain RVVs except for tombstone GC
+     // we need to ensure that current ops reach the client queues
+     // before queuing a clear, but there is no infrastructure for doing so
+     notifyBridgeClients(regionEvent);
+   }
+ 
+   /**
+    * Obtain a distributed lock for the clear operation.
+    */
+   private void distributedLockForClear() {
+     if (!this.scope.isGlobal()) {  // non-global regions

<TRUNCATED>