You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/01 23:07:02 UTC
[35/54] [abbrv] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/dd98a558/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 0c967c9..4bdd67d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -12,14 +12,54 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.LossAction;
+import org.apache.geode.cache.MembershipAttributes;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAccessException;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionDistributionException;
+import org.apache.geode.cache.RegionMembershipListener;
+import org.apache.geode.cache.ResumptionAction;
+import org.apache.geode.cache.RoleException;
+import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
@@ -31,9 +71,14 @@ import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.Role;
-import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionAdvisee;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionAdvisor.ProfileVisitor;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockRemoteToken;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -45,9 +90,21 @@ import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
import org.apache.geode.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
-import org.apache.geode.internal.cache.execute.*;
+import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor;
+import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultSender;
+import org.apache.geode.internal.cache.execute.DistributedRegionFunctionResultWaiter;
+import org.apache.geode.internal.cache.execute.FunctionStats;
+import org.apache.geode.internal.cache.execute.LocalResultCollector;
+import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
+import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.lru.LRUEntry;
-import org.apache.geode.internal.cache.persistence.*;
+import org.apache.geode.internal.cache.partitioned.Bucket;
+import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor;
+import org.apache.geode.internal.cache.persistence.PersistenceAdvisor;
+import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
+import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.persistence.PersistentMemberManager;
+import org.apache.geode.internal.cache.persistence.PersistentMemberView;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
@@ -63,21 +120,7 @@ import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.sequencelog.RegionLogger;
import org.apache.geode.internal.util.concurrent.StoppableCountDownLatch;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-/**
- *
- */
@SuppressWarnings("deprecation")
public class DistributedRegion extends LocalRegion implements CacheDistributionAdvisee {
private static final Logger logger = LogService.getLogger();
@@ -91,17 +134,17 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
final CacheDistributionAdvisor distAdvisor;
/**
- * @guarded.By {@link #dlockMonitor}
+ * GuardedBy {@link #dlockMonitor}
*/
private DistributedLockService dlockService;
- protected final AdvisorListener advisorListener = new AdvisorListener();
+ final AdvisorListener advisorListener = new AdvisorListener();
/** Set of currently missing required roles */
- protected final HashSet missingRequiredRoles = new HashSet();
+ final HashSet missingRequiredRoles = new HashSet(); // package-private to avoid synthetic accessor
/** True if this region is currently missing any required roles */
- protected volatile boolean isMissingRequiredRoles = false;
+ private volatile boolean isMissingRequiredRoles = false;
/**
* True if this region is has any required roles defined and the LossAction is either NO_ACCESS or
@@ -113,7 +156,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* Latch that is opened after initialization waits for required roles up to the
* <a href="DistributedSystem#member-timeout">member-timeout </a>.
*/
- protected final StoppableCountDownLatch initializationLatchAfterMemberTimeout;
+ private final StoppableCountDownLatch initializationLatchAfterMemberTimeout;
private final PersistenceAdvisor persistenceAdvisor;
@@ -134,11 +177,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*/
private final Object clearLock = new Object();
- private static AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
+ private static final AtomicBoolean loggedNetworkPartitionWarning = new AtomicBoolean(false);
/** Creates a new instance of DistributedRegion */
protected DistributedRegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion,
- GemFireCacheImpl cache, InternalRegionArguments internalRegionArgs) {
+ InternalCache cache, InternalRegionArguments internalRegionArgs) {
super(regionName, attrs, parentRegion, cache, internalRegionArgs);
this.initializationLatchAfterMemberTimeout =
new StoppableCountDownLatch(getCancelCriterion(), 1);
@@ -196,20 +239,19 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
diskStats = null;
}
PersistentMemberManager memberManager = cache.getPersistentMemberManager();
- this.persistenceAdvisor = new PersistenceAdvisorImpl(distAdvisor, dl, storage,
+ this.persistenceAdvisor = new PersistenceAdvisorImpl(this.distAdvisor, dl, storage,
this.getFullPath(), diskStats, memberManager);
- } catch (Exception e) {
+ } catch (Exception ignore) { // TODO: wrap exception in throw
throw new InternalGemFireError("Couldn't recover persistence");
}
} else {
this.persistenceAdvisor = null;
}
if (this.persistenceAdvisor != null) {
- this.persistentId = persistenceAdvisor.generatePersistentID();
+ this.persistentId = this.persistenceAdvisor.generatePersistentID();
} else {
this.persistentId = null;
}
-
}
@Override
@@ -225,10 +267,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*/
protected CacheDistributionAdvisor createDistributionAdvisor(
InternalRegionArguments internalRegionArgs) {
- return CacheDistributionAdvisor.createCacheDistributionAdvisor(this); // Warning: potential
- // early escape of object
- // before full
- // construction
+ // Warning: potential early escape of object before full construction
+ return CacheDistributionAdvisor.createCacheDistributionAdvisor(this);
}
/**
@@ -256,14 +296,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (!this.generateVersionTag) {
return true;
}
- return this.concurrencyChecksEnabled && (this.srp == null) && !isTX()
+ return this.concurrencyChecksEnabled && (this.serverRegionProxy == null) && !isTX()
&& this.scope.isDistributed() && !this.dataPolicy.withReplication();
}
- /**
- * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object, boolean, long, boolean)
- */
@Override
protected boolean virtualPut(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
@@ -276,8 +313,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
!event.isNetSearch() && // search and load processor handles own locking
!event.isNetLoad() &&
// @todo darrel/kirk: what about putAll?
- !event.isLocalLoad() && !event.isSingleHopPutOp()) { // Single Hop Op means dlock is already
- // taken at origin node.
+ !event.isLocalLoad() && !event.isSingleHopPutOp()) {
+ // Single Hop Op means dlock is already taken at origin node.
dlock = this.getDistributedLockIfGlobal(event.getKey());
}
if (isTraceEnabled) {
@@ -332,10 +369,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
logger.trace("DR.virtualPut: this cache has already seen this event {}", event);
}
- // Gester, Fix 39014: when hasSeenEvent, put will still distribute
+ // Fix 39014: when hasSeenEvent, put will still distribute
// event, but putAll did not. We add the logic back here, not to put
// back into DR.distributeUpdate() because we moved this part up into
- // LR.basicPutPart3 in purpose. Reviewed by Bruce.
+ // LR.basicPutPart3 in purpose.
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
event.getPutAllOperation().addEntry(event, true);
}
@@ -409,7 +446,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
event.getRemoveAllOperation().addEntry(event);
} else {
basicDestroy(event, true, null);
- // getSharedDataView().destroyExistingEntry(event, true, null);
}
}
@@ -448,7 +484,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- protected void setGeneratedVersionTag(boolean generateVersionTag) {
+ void setGeneratedVersionTag(boolean generateVersionTag) {
// there is at-least one other persistent member, so turn on concurrencyChecks
enableConcurrencyChecks();
@@ -470,7 +506,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
|| !this.generateVersionTag) {
return false;
}
- if (this.srp != null) { // client
+ if (this.serverRegionProxy != null) { // client
return false;
}
if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
@@ -513,8 +549,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
Set roles = Collections.unmodifiableSet(new HashSet(this.missingRequiredRoles));
throw new RegionAccessException(
LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1
- .toLocalizedString(
- new Object[] {getMembershipAttributes().getLossAction(), roles}),
+ .toLocalizedString(getMembershipAttributes().getLossAction(), roles),
getFullPath(), roles);
}
}
@@ -540,8 +575,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
Assert.assertTrue(!roles.isEmpty());
throw new RegionAccessException(
LocalizedStrings.DistributedRegion_OPERATION_IS_DISALLOWED_BY_LOSSACTION_0_BECAUSE_THESE_REQUIRED_ROLES_ARE_MISSING_1
- .toLocalizedString(
- new Object[] {getMembershipAttributes().getLossAction(), roles}),
+ .toLocalizedString(getMembershipAttributes().getLossAction(), roles),
getFullPath(), roles);
}
}
@@ -550,30 +584,30 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
@Override
protected void handleReliableDistribution(Set successfulRecipients) {
- handleReliableDistribution(successfulRecipients, Collections.EMPTY_SET, Collections.EMPTY_SET);
+ handleReliableDistribution(successfulRecipients, Collections.emptySet(),
+ Collections.emptySet());
}
- protected void handleReliableDistribution(Set successfulRecipients, Set otherRecipients1,
+ private void handleReliableDistribution(Set successfulRecipients, Set otherRecipients1,
Set otherRecipients2) {
if (this.requiresReliabilityCheck) {
MembershipAttributes ra = getMembershipAttributes();
- Set recipients = successfulRecipients;
// determine the successful roles
Set roles = new HashSet();
- for (Iterator iter = recipients.iterator(); iter.hasNext();) {
- InternalDistributedMember mbr = (InternalDistributedMember) iter.next();
+ for (Object successfulRecipient : successfulRecipients) {
+ InternalDistributedMember mbr = (InternalDistributedMember) successfulRecipient;
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
- for (Iterator iter = otherRecipients1.iterator(); iter.hasNext();) {
- InternalDistributedMember mbr = (InternalDistributedMember) iter.next();
+ for (Object anOtherRecipients1 : otherRecipients1) {
+ InternalDistributedMember mbr = (InternalDistributedMember) anOtherRecipients1;
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
}
- for (Iterator iter = otherRecipients2.iterator(); iter.hasNext();) {
- InternalDistributedMember mbr = (InternalDistributedMember) iter.next();
+ for (Object anOtherRecipients2 : otherRecipients2) {
+ InternalDistributedMember mbr = (InternalDistributedMember) anOtherRecipients2;
if (mbr != null) {
roles.addAll(mbr.getRoles());
}
@@ -581,22 +615,18 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// determine the missing roles
Set failedRoles = new HashSet(ra.getRequiredRoles());
failedRoles.removeAll(roles);
- if (failedRoles.isEmpty())
+ if (failedRoles.isEmpty()) {
return;
- // if (rp.isAllAccessWithQueuing()) {
- // this.rmq.add(data, failedRoles);
- // } else {
+ }
throw new RegionDistributionException(
LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_MAY_HAVE_FAILED_TO_NOTIFY_THESE_REQUIRED_ROLES_0
.toLocalizedString(failedRoles),
getFullPath(), failedRoles);
- // }
}
}
/**
- *
* Called when we do a distributed operation and don't have anyone to distributed it too. Since
* this is only called when no distribution was done (i.e. no recipients) we do not check
* isMissingRequiredRoles because it might not longer be true due to race conditions
@@ -607,18 +637,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* LIMITED_ACCESS.
* @since GemFire 5.0
*/
- protected boolean isNoDistributionOk() {
+ boolean isNoDistributionOk() {
if (this.requiresReliabilityCheck) {
MembershipAttributes ra = getMembershipAttributes();
- // if (ra.getLossAction().isAllAccessWithQueuing()) {
- // return !ra.hasRequiredRoles();
- // } else {
- Set failedRoles = ra.getRequiredRoles();
+ Set<Role> failedRoles = ra.getRequiredRoles();
throw new RegionDistributionException(
LocalizedStrings.DistributedRegion_OPERATION_DISTRIBUTION_WAS_NOT_DONE_TO_THESE_REQUIRED_ROLES_0
.toLocalizedString(failedRoles),
getFullPath(), failedRoles);
- // }
}
return true;
}
@@ -633,76 +659,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return false;
}
-
@Override
public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
return !doesNotDistribute() && super.shouldSyncForCrashedMember(id);
}
-
- /**
- * Adjust the specified set of recipients by removing any of them that are currently having their
- * data queued.
- *
- * @param recipients the set of recipients that a message is to be distributed too. Recipients
- * that are currently having their data queued will be removed from this set.
- * @return the set, possibly null, of recipients that are currently having their data queued.
- * @since GemFire 5.0
- */
- protected Set adjustForQueuing(Set recipients) {
- Set result = null;
- // if (this.requiresReliabilityCheck) {
- // MembershipAttributes ra = getMembershipAttributes();
- // if (ra.getLossAction().isAllAccessWithQueuing()) {
- // Set currentQueuedRoles = this.rmq.getQueuingRoles();
- // if (currentQueuedRoles != null) {
- // // foreach recipient see if any of his roles are queued and if
- // // they are remove him from recipients and add him to result
- // Iterator it = recipients.iterator();
- // while (it.hasNext()) {
- // DistributedMember dm = (DistributedMember)it.next();
- // Set dmRoles = dm.getRoles();
- // if (!dmRoles.isEmpty()) {
- // if (intersects(dmRoles, currentQueuedRoles)) {
- // it.remove(); // fix for bug 34447
- // if (result == null) {
- // result = new HashSet();
- // }
- // result.add(dm);
- // }
- // }
- // }
- // }
- // }
- // }
- return result;
- }
-
- /**
- * Returns true if the two sets intersect
- *
- * @param a a non-null non-empty set
- * @param b a non-null non-empty set
- * @return true if sets a and b intersect; false if not
- * @since GemFire 5.0
- */
- public static boolean intersects(Set a, Set b) {
- Iterator it;
- Set target;
- if (a.size() <= b.size()) {
- it = a.iterator();
- target = b;
- } else {
- it = b.iterator();
- target = a;
- }
- while (it.hasNext()) {
- if (target.contains(it.next()))
- return true;
- }
- return false;
- }
-
@Override
public boolean requiresReliabilityCheck() {
return this.requiresReliabilityCheck;
@@ -736,7 +697,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*
* @return true if asynchronous resumption is triggered
*/
- protected boolean resumeReliability(InternalDistributedMember id, Set newlyAcquiredRoles) {
+ private boolean resumeReliability(InternalDistributedMember id, Set newlyAcquiredRoles) {
boolean async = false;
try {
ResumptionAction ra = getMembershipAttributes().getResumptionAction();
@@ -763,6 +724,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
final Set newlyAcquiredRoles) throws RejectedExecutionException {
final ResumptionAction ra = getMembershipAttributes().getResumptionAction();
getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ @Override
public void run() {
try {
if (ra.isReinitialize()) {
@@ -834,8 +796,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*
* @return true if asynchronous resumption is triggered
*/
- protected boolean lostReliability(final InternalDistributedMember id,
- final Set newlyMissingRoles) {
+ private boolean lostReliability(final InternalDistributedMember id, final Set newlyMissingRoles) {
if (DistributedRegion.ignoreReconnect) { // test hook
return false;
}
@@ -844,7 +805,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
try {
if (getMembershipAttributes().getLossAction().isReconnect()) {
async = true;
- if (isInitializingThread) {
+ if (this.isInitializingThread) {
doLostReliability(true, id, newlyMissingRoles);
} else {
doLostReliability(false, id, newlyMissingRoles);
@@ -852,18 +813,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// we don't do this in the waiting pool because we're going to
// disconnect
// the distributed system, and it will wait for the pool to empty
- /*
- * moved to a new method called doLostReliablity. Thread t = new
- * Thread("Reconnect Distributed System") { public void run() { try { // TODO: may need to
- * check isReconnecting and checkReadiness... initializationLatchAfterMemberTimeout.await();
- * // TODO: call reconnect here getSystem().tryReconnect((GemFireCache)getCache()); // added
- * for reconnect. synchronized (missingRequiredRoles) { // any number of threads may be
- * waiting on missingRequiredRoles missingRequiredRoles.notifyAll(); // need to fire an
- * event if id is not null if (hasListener() && id != null) { RoleEventImpl relEvent = new
- * RoleEventImpl( DistributedRegion.this, Operation.CACHE_RECONNECT, null, true, id,
- * newlyMissingRoles); dispatchListenerEvent( EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
- * } } } catch (Exception e) { } } }; t.setDaemon(true); t.start();
- */
}
} catch (CancelException cce) {
throw cce;
@@ -879,7 +828,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
try {
if (!isInitializing) {
// moved code to a new thread.
- Thread t = new Thread(
+ Thread thread = new Thread(
LocalizedStrings.DistributedRegion_RECONNECT_DISTRIBUTED_SYSTEM.toLocalizedString()) {
@Override
public void run() {
@@ -907,15 +856,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
};
- t.setDaemon(true);
- t.start();
+ thread.setDaemon(true);
+ thread.start();
} else {
getSystem().tryReconnect(false, "Role Loss", getCache()); // added for
// reconnect.
- synchronized (missingRequiredRoles) {
+ synchronized (this.missingRequiredRoles) {
// any number of threads may be waiting on missingRequiredRoles
- missingRequiredRoles.notifyAll();
+ this.missingRequiredRoles.notifyAll();
// need to fire an event if id is not null
if (hasListener() && id != null) {
RoleEventImpl relEvent = new RoleEventImpl(DistributedRegion.this,
@@ -923,10 +872,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
dispatchListenerEvent(EnumListenerEvent.AFTER_ROLE_LOSS, relEvent);
}
}
- // } catch (CancelException cce){
-
- // }
-
}
} catch (CancelException ignor) {
throw ignor;
@@ -934,12 +879,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
logger.fatal(LocalizedMessage.create(LocalizedStrings.DistributedRegion_UNEXPECTED_EXCEPTION),
e);
}
-
}
- protected void lockCheckReadiness() {
+ void lockCheckReadiness() { // package-private to avoid synthetic accessor
// fix for bug 32610
- cache.getCancelCriterion().checkCancelInProgress(null);
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
checkReadiness();
}
@@ -956,9 +900,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- /**
- * @see LocalRegion#localDestroyNoCallbacks(Object)
- */
@Override
public void localDestroyNoCallbacks(Object key) {
super.localDestroyNoCallbacks(key);
@@ -970,9 +911,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- /**
- * @see LocalRegion#localDestroy(Object, Object)
- */
@Override
public void localDestroy(Object key, Object aCallbackArgument) throws EntryNotFoundException {
super.localDestroy(key, aCallbackArgument);
@@ -984,9 +922,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- /**
- * @see LocalRegion#invalidate(Object, Object)
- */
@Override
public void invalidate(Object key, Object aCallbackArgument)
throws TimeoutException, EntryNotFoundException {
@@ -996,7 +931,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
checkForLimitedOrNoAccess();
Lock dlock = this.getDistributedLockIfGlobal(key);
try {
- super.validatedInvalidate(key, aCallbackArgument);
+ validatedInvalidate(key, aCallbackArgument);
} finally {
if (dlock != null)
dlock.unlock();
@@ -1037,8 +972,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* Called while NOT holding lock on parent's subregions
*
* @throws IllegalStateException if region is not compatible with a region in another VM.
- *
- * @see LocalRegion#initialize(InputStream, InternalDistributedMember, InternalRegionArguments)
*/
@Override
protected void initialize(InputStream snapshotInputStream, InternalDistributedMember imageTarget,
@@ -1060,7 +993,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
try {
try {
- PersistentMemberID persistentId = null;
+ PersistentMemberID persistentMemberId = null;
boolean recoverFromDisk = isRecoveryNeeded();
DiskRegion dskRgn = getDiskRegion();
if (recoverFromDisk) {
@@ -1071,7 +1004,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (logger.isDebugEnabled()) {
logger.debug("DistributedRegion.getInitialImageAndRecovery: Finished Recovery");
}
- persistentId = dskRgn.getMyPersistentID();
+ persistentMemberId = dskRgn.getMyPersistentID();
}
// Create OQL indexes before starting GII.
@@ -1079,7 +1012,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (getDataPolicy().withReplication() || getDataPolicy().withPreloaded()) {
getInitialImageAndRecovery(snapshotInputStream, imageTarget, internalRegionArgs,
- recoverFromDisk, persistentId);
+ recoverFromDisk, persistentMemberId);
} else {
new CreateRegionProcessor(this).initializeRegion();
if (snapshotInputStream != null) {
@@ -1093,9 +1026,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
initMembershipRoles();
- isInitializingThread = false;
- super.initialize(null, null, null); // makes sure all latches are released if they haven't
- // been already
+ this.isInitializingThread = false;
+ // makes sure all latches are released if they haven't been already
+ super.initialize(null, null, null);
} finally {
if (this.eventTracker != null) {
this.eventTracker.setInitialized();
@@ -1114,40 +1047,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
/**
* A reference counter to protected the memoryThresholdReached boolean
*/
- private final Set<DistributedMember> memoryThresholdReachedMembers =
- new CopyOnWriteArraySet<DistributedMember>();
-
- /** Sets and returns giiMissingRequiredRoles */
- private boolean checkInitialImageForReliability(InternalDistributedMember imageTarget,
- CacheDistributionAdvisor.InitialImageAdvice advice) {
- // assumption: required roles are interesting to GII only if Reinitialize...
- // if (true)
- return false;
- // if (getMembershipAttributes().hasRequiredRoles()
- // && getMembershipAttributes().getResumptionAction().isReinitialize()) {
- // // are any required roles missing for GII with Reinitialize?
- // Set missingRR = new HashSet(getMembershipAttributes().getRequiredRoles());
- // missingRR.removeAll(getSystem().getDistributedMember().getRoles());
- // for (Iterator iter = advice.replicates.iterator(); iter.hasNext();) {
- // DistributedMember member = (DistributedMember)iter.next();
- // missingRR.removeAll(member.getRoles());
- // }
- // for (Iterator iter = advice.others.iterator(); iter.hasNext();) {
- // DistributedMember member = (DistributedMember)iter.next();
- // missingRR.removeAll(member.getRoles());
- // }
- // for (Iterator iter = advice.preloaded.iterator(); iter.hasNext();) {
- // DistributedMember member = (DistributedMember)iter.next();
- // missingRR.removeAll(member.getRoles());
- // }
- // if (!missingRR.isEmpty()) {
- // // entering immediate loss condition, which will cause reinit on resume
- // this.giiMissingRequiredRoles = true;
- // }
- // }
- // return this.giiMissingRequiredRoles;
- }
+ private final Set<DistributedMember> memoryThresholdReachedMembers = new CopyOnWriteArraySet<>();
+ // TODO: cleanup getInitialImageAndRecovery
private void getInitialImageAndRecovery(InputStream snapshotInputStream,
InternalDistributedMember imageSrc, InternalRegionArguments internalRegionArgs,
boolean recoverFromDisk, PersistentMemberID persistentId) throws TimeoutException {
@@ -1158,17 +1060,16 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
imgState.init();
boolean targetRecreated = internalRegionArgs.getRecreateFlag();
Boolean isCBool = (Boolean) isConversion.get();
- boolean isForConversion = isCBool != null ? isCBool.booleanValue() : false;
+ boolean isForConversion = isCBool != null ? isCBool : false;
if (recoverFromDisk && snapshotInputStream != null && !isForConversion) {
throw new InternalGemFireError(
LocalizedStrings.DistributedRegion_IF_LOADING_A_SNAPSHOT_THEN_SHOULD_NOT_BE_RECOVERING_ISRECOVERING_0_SNAPSHOTSTREAM_1
- .toLocalizedString(
- new Object[] {Boolean.valueOf(recoverFromDisk), snapshotInputStream}));
+ .toLocalizedString(new Object[] {true, snapshotInputStream}));
}
ProfileExchangeProcessor targetProvider;
- if (dataPolicy.withPersistence()) {
+ if (this.dataPolicy.withPersistence()) {
targetProvider =
new CreatePersistentRegionProcessor(this, getPersistenceAdvisor(), recoverFromDisk);
} else {
@@ -1177,15 +1078,15 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
imgState.setInRecovery(false);
RegionVersionVector recovered_rvv = null;
- if (dataPolicy.withPersistence()) {
- recovered_rvv = (this.getVersionVector() == null ? null
- : this.getVersionVector().getCloneForTransmission());
+ if (this.dataPolicy.withPersistence()) {
+ recovered_rvv = this.getVersionVector() == null ? null
+ : this.getVersionVector().getCloneForTransmission();
}
// initializeRegion will send out our profile
targetProvider.initializeRegion();
- if (persistenceAdvisor != null) {
- persistenceAdvisor.initialize();
+ if (this.persistenceAdvisor != null) {
+ this.persistenceAdvisor.initialize();
}
// Register listener here so that the remote members are known
@@ -1193,7 +1094,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// remote members
if (!isInternalRegion()) {
if (!this.isDestroyed) {
- cache.getInternalResourceManager().addResourceListener(ResourceType.MEMORY, this);
+ this.cache.getInternalResourceManager().addResourceListener(ResourceType.MEMORY, this);
}
}
@@ -1212,9 +1113,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
loadSnapshotDuringInitialization(snapshotInputStream);
} catch (IOException e) {
- throw new RuntimeException(e); // @todo change this exception?
+ throw new RuntimeException(e); // TODO: change this exception?
} catch (ClassNotFoundException e) {
- throw new RuntimeException(e); // @todo change this exception?
+ throw new RuntimeException(e); // TODO: change this exception?
}
cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
return;
@@ -1227,25 +1128,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// or not)
InitialImageOperation iiop = new InitialImageOperation(this, this.entries);
- // [defunct] Special case GII for PR admin regions (which are always
- // replicates and always writers
- // bruce: this was commented out after adding the GIIAckRequest logic to
- // force
- // consistency before the gii operation begins
- // if (isUsedForPartitionedRegionAdmin() ||
- // isUsedForPartitionedRegionBucket()) {
- // releaseBeforeGetInitialImageLatch();
- // iiop.getFromAll(this.distAdvisor.adviseGeneric(), false);
- // cleanUpDestroyedTokens();
- // return;
- // }
-
CacheDistributionAdvisor.InitialImageAdvice advice = null;
boolean done = false;
while (!done && !isDestroyed()) {
advice = targetProvider.getInitialImageAdvice(advice);
- checkInitialImageForReliability(imageSrc, advice);
boolean attemptGetFromOne = imageSrc != null // we were given a specific member
|| this.dataPolicy.withPreloaded() && !advice.preloaded.isEmpty() // this is a preloaded
// region
@@ -1331,12 +1218,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- /*
- * no more union GII // do union getInitialImage Set rest = new HashSet();
- * rest.addAll(advice.others); rest.addAll(advice.preloaded); // push profile w/ recovery
- * flag turned off at same time that we // do a union getInitialImage boolean pushProfile
- * = recoverFromDisk; iiop.getFromAll(rest, pushProfile);
- */
cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus.NO_GII);
done = true;
return;
@@ -1344,13 +1225,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
break;
}
}
-
- return;
- }
-
- private void synchronizeWith(InternalDistributedMember target, VersionSource idToRecover) {
- InitialImageOperation op = new InitialImageOperation(this, this.entries);
- op.synchronizeWith(target, idToRecover, null);
}
/**
@@ -1359,7 +1233,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*/
public void synchronizeForLostMember(InternalDistributedMember lostMember,
VersionSource lostVersionID) {
- if (this.concurrencyChecksEnabled == false) {
+ if (!this.concurrencyChecksEnabled) {
return;
}
CacheDistributionAdvisor advisor = getCacheDistributionAdvisor();
@@ -1379,11 +1253,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
op.synchronizeWith(target, versionMember, lostMember);
}
- /**
- * invoked just before an initial image is requested from another member
- */
/** remove any partial entries received in a failed GII */
- protected void cleanUpAfterFailedGII(boolean recoverFromDisk) {
+ void cleanUpAfterFailedGII(boolean recoverFromDisk) {
DiskRegion dskRgn = getDiskRegion();
// if we have a persistent region, instead of deleting everything on disk,
// we will just reset the "recovered from disk" flag. After
@@ -1428,8 +1299,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// remove all the roles we are playing since they will never be
// missing
this.missingRequiredRoles.removeAll(getSystem().getDistributedMember().getRoles());
- for (Iterator iter = others.iterator(); iter.hasNext();) {
- DistributedMember other = (DistributedMember) iter.next();
+ for (Object other1 : others) {
+ DistributedMember other = (DistributedMember) other1;
this.missingRequiredRoles.removeAll(other.getRoles());
}
}
@@ -1445,7 +1316,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
try {
if (this.giiMissingRequiredRoles) {
// force reliability loss and possibly resumption
- isInitializingThread = true;
+ this.isInitializingThread = true;
synchronized (this.advisorListener) {
synchronized (this.missingRequiredRoles) {
// forcing state of loss because of bad GII
@@ -1462,7 +1333,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
logger.debug("GetInitialImage had missing required roles.");
}
// TODO: will this work with RECONNECT and REINITIALIZE?
- isInitializingThread = true;
+ this.isInitializingThread = true;
lostReliability(null, null);
if (this.missingRequiredRoles.isEmpty()) {
// all required roles are present so force resumption
@@ -1477,7 +1348,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// pur code to increment the stats.
boolean async = resumeReliability(null, null);
if (async) {
- advisorListener.destroyed = true;
+ this.advisorListener.destroyed = true;
}
}
}
@@ -1508,7 +1379,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
logger.debug("Initialization completed with missing required roles: {}",
this.missingRequiredRoles);
}
- isInitializingThread = true;
+ this.isInitializingThread = true;
lostReliability(null, null);
}
}
@@ -1516,10 +1387,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
} catch (RegionDestroyedException ignore) {
// ignore to fix bug 34639 may be thrown by waitForRequiredRoles
- } catch (CancelException ignore) {
+ } catch (CancelException e) {
// ignore to fix bug 34639 may be thrown by waitForRequiredRoles
- if (isInitializingThread) {
- throw ignore;
+ if (this.isInitializingThread) {
+ throw e;
}
} catch (Exception e) {
logger.fatal(
@@ -1538,7 +1409,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// called by InitialImageOperation to clean up destroyed tokens
// release afterGetInitialImageInitializationLatch before unlocking
// cleanUpLock
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UL_UNRELEASED_LOCK")
+ @SuppressWarnings("UL_UNRELEASED_LOCK")
protected void cleanUpDestroyedTokensAndMarkGIIComplete(GIIStatus giiStatus) {
// We need to clean up the disk before we release the after get initial image latch
DiskRegion dskRgn = getDiskRegion();
@@ -1564,8 +1435,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
clearEntries(rvv);
}
// need to do this before we release the afterGetInitialImageLatch
- if (persistenceAdvisor != null) {
- persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
+ if (this.persistenceAdvisor != null) {
+ this.persistenceAdvisor.setOnline(GIIStatus.didGII(giiStatus), false, getPersistentID());
}
} finally {
// release after gii lock first so basicDestroy will see isInitialized()
@@ -1588,14 +1459,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
LocalizedStrings.DistributedRegion_INITIALIZING_REGION_COMPLETED_0, this.getName()));
}
- /**
- * @see LocalRegion#basicDestroy(EntryEventImpl, boolean, Object)
- */
@Override
protected void basicDestroy(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
throws EntryNotFoundException, CacheWriterException, TimeoutException {
// disallow local destruction for mirrored keysvalues regions
- boolean invokeWriter = cacheWrite;
boolean hasSeen = false;
if (hasSeenEvent(event)) {
hasSeen = true;
@@ -1603,11 +1470,12 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
checkIfReplicatedAndLocalDestroy(event);
try {
+ boolean invokeWriter = cacheWrite;
if (this.requiresOneHopForMissingEntry(event)) {
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null /* || re.isTombstone() */ || !this.generateVersionTag) {
- if (this.srp == null) {
+ if (this.serverRegionProxy == null) {
// only assert for non-client regions.
Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
}
@@ -1651,7 +1519,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- return;
} finally {
if (hasSeen) {
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
@@ -1699,10 +1566,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return evictDestroyWasDone;
}
-
- /**
- * @see LocalRegion#basicInvalidateRegion(RegionEventImpl)
- */
@Override
void basicInvalidateRegion(RegionEventImpl event) {
// disallow local invalidation for replicated regions
@@ -1721,7 +1584,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* decide if InvalidateRegionOperation should be sent to peers. broken out so that BucketRegion
* can override
*
- * @param event
* @return true if {@link InvalidateRegionOperation} should be distributed, false otherwise
*/
protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
@@ -1738,9 +1600,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
new InvalidateRegionOperation(event).distribute();
}
- /**
- * @see LocalRegion#basicDestroyRegion(RegionEventImpl, boolean, boolean, boolean)
- */
@Override
void basicDestroyRegion(RegionEventImpl event, boolean cacheWrite, boolean lock,
boolean callbackEvents) throws CacheWriterException, TimeoutException {
@@ -1749,7 +1608,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// when another member concurrently creates this region. See bug 42051.
boolean isClose = event.getOperation().isClose();
if (!isClose) {
- cache.beginDestroy(path, this);
+ this.cache.beginDestroy(path, this);
}
try {
super.basicDestroyRegion(event, cacheWrite, lock, callbackEvents);
@@ -1772,17 +1631,16 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
} finally {
if (!isClose) {
- cache.endDestroy(path, this);
+ this.cache.endDestroy(path, this);
}
RegionLogger.logDestroy(path, getMyId(), getPersistentID(), isClose);
}
}
-
@Override
protected void distributeDestroyRegion(RegionEventImpl event, boolean notifyOfRegionDeparture) {
- if (persistenceAdvisor != null) {
- persistenceAdvisor.releaseTieLock();
+ if (this.persistenceAdvisor != null) {
+ this.persistenceAdvisor.releaseTieLock();
}
new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
}
@@ -1790,16 +1648,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
/**
* Return true if invalidation occurred; false if it did not, for example if it was already
* invalidated
- *
- * @see LocalRegion#basicInvalidate(EntryEventImpl)
*/
@Override
void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException {
-
boolean hasSeen = false;
if (hasSeenEvent(event)) {
hasSeen = true;
}
+
try {
// disallow local invalidation for replicated regions
if (event.isLocalInvalid() && !event.getOperation().isLocal() && getScope().isDistributed()
@@ -1812,7 +1668,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// bug #45704: see if a one-hop must be done for this operation
RegionEntry re = getRegionEntry(event.getKey());
if (re == null/* || re.isTombstone() */ || !this.generateVersionTag) {
- if (this.srp == null) {
+ if (this.serverRegionProxy == null) {
// only assert for non-client regions.
Assert.assertTrue(!this.dataPolicy.withReplication() || !this.generateVersionTag);
}
@@ -1838,7 +1694,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
super.basicInvalidate(event);
- return;
} finally {
if (hasSeen) {
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
@@ -1871,26 +1726,25 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
@Override
void basicUpdateEntryVersion(EntryEventImpl event) throws EntryNotFoundException {
- LocalRegion lr = event.getLocalRegion();
- AbstractRegionMap arm = ((AbstractRegionMap) lr.getRegionMap());
+ LocalRegion localRegion = event.getLocalRegion();
+ AbstractRegionMap regionMap = (AbstractRegionMap) localRegion.getRegionMap();
try {
- arm.lockForCacheModification(lr, event);
+ regionMap.lockForCacheModification(localRegion, event);
try {
if (!hasSeenEvent(event)) {
super.basicUpdateEntryVersion(event);
}
- return;
} finally {
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
distributeUpdateEntryVersion(event);
}
}
} finally {
- arm.releaseCacheModificationLock(lr, event);
+ regionMap.releaseCacheModificationLock(localRegion, event);
}
}
- protected void distributeUpdateEntryVersion(EntryEventImpl event) {
+ void distributeUpdateEntryVersion(EntryEventImpl event) {
if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
&& !isTX() /* only distribute if non-tx */) {
if (event.isDistributed() && !event.isOriginRemote()) {
@@ -1902,10 +1756,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
@Override
- protected void basicClear(RegionEventImpl ev) {
+ protected void basicClear(RegionEventImpl regionEvent) {
Lock dlock = this.getRegionDistributedLockIfGlobal();
try {
- super.basicClear(ev);
+ super.basicClear(regionEvent);
} finally {
if (dlock != null)
dlock.unlock();
@@ -1919,7 +1773,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
do {
// non-replicate regions must defer to a replicate for clear/invalidate of region
Set<InternalDistributedMember> repls = this.distAdvisor.adviseReplicates();
- if (repls.size() > 0) {
+ if (!repls.isEmpty()) {
InternalDistributedMember mbr = repls.iterator().next();
RemoteRegionOperation op = RemoteRegionOperation.clear(mbr, this);
try {
@@ -1949,7 +1803,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// to suspend locking, which is what distributedLockForClear() does. We don't
// want that to happen, so we'll synchronize to make sure only one thread on
// this member performs a clear.
- synchronized (clearLock) {
+ synchronized (this.clearLock) {
if (enableRVV) {
distributedLockForClear();
@@ -2016,9 +1870,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
/**
* obtain locks preventing generation of new versions in other members
- *
- * @param participants
- **/
+ */
private void obtainWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
@@ -2029,12 +1881,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* pause local operations so that a clear() can be performed and flush comm channels to the given
* member
*/
- public void lockLocallyForClear(DM dm, InternalDistributedMember locker, CacheEvent event) {
+ void lockLocallyForClear(DM dm, InternalDistributedMember locker, CacheEvent event) {
RegionVersionVector rvv = getVersionVector();
- ARMLockTestHook alth = getRegionMap().getARMLockTestHook();
- if (alth != null)
- alth.beforeLock(this, event);
+ ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
+ if (armLockTestHook != null) {
+ armLockTestHook.beforeLock(this, event);
+ }
if (rvv != null) {
// block new operations from being applied to the region map
@@ -2044,46 +1897,44 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
checkReadiness();
// Only need to flush if NOACK at this point
if (this.getAttributes().getScope().isDistributedNoAck()) {
- Set<InternalDistributedMember> mbrs = getDistributionAdvisor().adviseCacheOp();
- StateFlushOperation.flushTo(mbrs, this);
+ Set<InternalDistributedMember> members = getDistributionAdvisor().adviseCacheOp();
+ StateFlushOperation.flushTo(members, this);
}
}
- if (alth != null)
- alth.afterLock(this, null);
-
+ if (armLockTestHook != null) {
+ armLockTestHook.afterLock(this, null);
+ }
}
/**
* releases the locks obtained in obtainWriteLocksForClear
- *
- * @param participants
*/
private void releaseWriteLocksForClear(RegionEventImpl regionEvent,
Set<InternalDistributedMember> participants) {
- ARMLockTestHook alth = getRegionMap().getARMLockTestHook();
- if (alth != null)
- alth.beforeRelease(this, regionEvent);
+ ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
+ if (armLockTestHook != null) {
+ armLockTestHook.beforeRelease(this, regionEvent);
+ }
getVersionVector().unlockForClear(getMyId());
DistributedClearOperation.releaseLocks(regionEvent, participants);
- if (alth != null)
- alth.afterRelease(this, regionEvent);
-
+ if (armLockTestHook != null) {
+ armLockTestHook.afterRelease(this, regionEvent);
+ }
}
/**
* Wait for in progress clears that were initiated by this member.
*/
private void waitForInProgressClear() {
-
RegionVersionVector rvv = getVersionVector();
if (rvv != null) {
- synchronized (clearLock) {
+ synchronized (this.clearLock) {
// do nothing;
- // DAN - I'm a little scared that the compiler might optimize
+ // I'm a little scared that the compiler might optimize
// away this synchronization if we really do nothing. Hence
// my fine log message below. This might not be necessary.
if (logger.isDebugEnabled()) {
@@ -2107,12 +1958,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return eventId;
}
- // test hook for DistributedAckRegionCCEDUnitTest
- public static boolean LOCALCLEAR_TESTHOOK;
-
@Override
void basicLocalClear(RegionEventImpl rEvent) {
- if (getScope().isDistributed() && getDataPolicy().withReplication() && !LOCALCLEAR_TESTHOOK) {
+ if (getScope().isDistributed() && getDataPolicy().withReplication()) {
throw new UnsupportedOperationException(
LocalizedStrings.DistributedRegion_LOCALCLEAR_IS_NOT_SUPPORTED_ON_DISTRIBUTED_REPLICATED_REGIONS
.toLocalizedString());
@@ -2124,64 +1972,63 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return getSystem().getDistributionManager().getConfig();
}
- /*
- * @see SearchLoadAndWriteProcessor#initialize(LocalRegion, Object, Object)
- */
- public final CacheDistributionAdvisor getDistributionAdvisor() {
+ @Override
+ public CacheDistributionAdvisor getDistributionAdvisor() {
return this.distAdvisor;
}
+ @Override
public CacheDistributionAdvisor getCacheDistributionAdvisor() {
return this.distAdvisor;
}
- public final PersistenceAdvisor getPersistenceAdvisor() {
+ public PersistenceAdvisor getPersistenceAdvisor() {
return this.persistenceAdvisor;
}
- public final PersistentMemberID getPersistentID() {
+ public PersistentMemberID getPersistentID() {
return this.persistentId;
}
/** Returns the distribution profile; lazily creates one if needed */
+ @Override
public Profile getProfile() {
return this.distAdvisor.createProfile();
}
- public void fillInProfile(Profile p) {
- assert p instanceof CacheProfile;
- CacheProfile profile = (CacheProfile) p;
- profile.dataPolicy = getDataPolicy();
- profile.hasCacheLoader = basicGetLoader() != null;
- profile.hasCacheWriter = basicGetWriter() != null;
- profile.hasCacheListener = hasListener();
+ @Override
+ public void fillInProfile(Profile profile) {
+ assert profile instanceof CacheProfile;
+ CacheProfile cacheProfile = (CacheProfile) profile;
+ cacheProfile.dataPolicy = getDataPolicy();
+ cacheProfile.hasCacheLoader = basicGetLoader() != null;
+ cacheProfile.hasCacheWriter = basicGetWriter() != null;
+ cacheProfile.hasCacheListener = hasListener();
Assert.assertTrue(this.scope.isDistributed());
- profile.scope = this.scope;
- profile.inRecovery = getImageState().getInRecovery();
- profile.isPersistent = getDataPolicy().withPersistence();
- profile.setSubscriptionAttributes(getSubscriptionAttributes());
- // Kishor : Below PDX check is added for rolling upgrade support. We are
+ cacheProfile.scope = this.scope;
+ cacheProfile.inRecovery = getImageState().getInRecovery();
+ cacheProfile.isPersistent = getDataPolicy().withPersistence();
+ cacheProfile.setSubscriptionAttributes(getSubscriptionAttributes());
+
+ // Below PDX check is added for rolling upgrade support. We are
// removing Old wan in this checkin. PDX region are always gatewayEnabled
// irrespective whether gatewayHub is configured or not.
// Old version Pdx region always has this attribute true so to avoid region
// attribute comparison across member we are setting it to true.
- if (this.isPdxTypesRegion()) {
- profile.isGatewayEnabled = true;
- } else {
- profile.isGatewayEnabled = false;
- }
- profile.serialNumber = getSerialNumber();
- profile.regionInitialized = this.isInitialized();
- profile.persistentID = getPersistentID();
+
+ cacheProfile.isGatewayEnabled = isPdxTypesRegion();
+ cacheProfile.serialNumber = getSerialNumber();
+ cacheProfile.regionInitialized = isInitialized();
+ cacheProfile.persistentID = getPersistentID();
if (getPersistenceAdvisor() != null) {
- profile.persistenceInitialized = getPersistenceAdvisor().isOnline();
+ cacheProfile.persistenceInitialized = getPersistenceAdvisor().isOnline();
}
- profile.hasCacheServer = ((this.cache.getCacheServers().size() > 0) ? true : false);
- profile.requiresOldValueInEvents = this.dataPolicy.withReplication()
+ cacheProfile.hasCacheServer = this.cache.getCacheServers().size() > 0 ? true : false;
+ cacheProfile.requiresOldValueInEvents = this.dataPolicy.withReplication()
&& this.filterProfile != null && this.filterProfile.hasCQs();
- profile.gatewaySenderIds = getGatewaySenderIds();
- profile.asyncEventQueueIds = getVisibleAsyncEventQueueIds();
- profile.isOffHeap = getOffHeap();
+ cacheProfile.gatewaySenderIds = getGatewaySenderIds();
+ cacheProfile.asyncEventQueueIds = getVisibleAsyncEventQueueIds();
+ cacheProfile.isOffHeap = getOffHeap();
}
/**
@@ -2190,25 +2037,20 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*/
public DistributedLockService getLockService() {
synchronized (this.dlockMonitor) {
- // Assert.assertTrue(this.scope.isGlobal()); since 7.0 this is used for distributing clear()
- // ops
-
- String svcName = getFullPath();
+ String dlsName = getFullPath();
if (this.dlockService == null) {
- this.dlockService = DistributedLockService.getServiceNamed(svcName);
+ this.dlockService = DistributedLockService.getServiceNamed(dlsName);
if (this.dlockService == null) {
- this.dlockService = DLockService.create(getFullPath(), getSystem(),
- true /* distributed */, false /* destroyOnDisconnect */, // region destroy will
- // destroy dls
- false /* automateFreeResources */); // manual freeResources only
+ // region destroy will destroy dls and manual freeResources only
+ this.dlockService = DLockService.create(getFullPath(), getSystem(), true, false, false);
}
// handle is-lock-grantor region attribute...
if (this.isLockGrantor) {
this.dlockService.becomeLockGrantor();
}
if (logger.isDebugEnabled()) {
- logger.debug("LockService for {} is using LockLease={}, LockTimeout=", svcName,
+ logger.debug("LockService for {} is using LockLease={}, LockTimeout={}", dlsName,
getCache().getLockLease(), getCache().getLockTimeout());
}
}
@@ -2216,21 +2058,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- /**
- * @see LocalRegion#isCurrentlyLockGrantor()
- */
@Override
protected boolean isCurrentlyLockGrantor() {
- if (!this.scope.isGlobal())
- return false;
- return getLockService().isLockGrantor();
+ return this.scope.isGlobal() && getLockService().isLockGrantor();
}
@Override
public boolean isLockGrantor() {
- if (!this.scope.isGlobal())
- return false;
- return this.isLockGrantor;
+ return this.scope.isGlobal() && this.isLockGrantor;
}
@Override
@@ -2261,13 +2096,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
/** @return the deserialized value */
@Override
@Retained
- protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateInterface txState,
+ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateInterface tx,
boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) throws CacheLoaderException, TimeoutException {
+
checkForLimitedOrNoAccess();
- RegionEntry re = null;
final Object key = keyInfo.getKey();
final Object aCallbackArgument = keyInfo.getCallbackArg();
Operation op;
@@ -2276,30 +2111,26 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
} else {
op = Operation.UPDATE;
}
- long lastModified = 0L;
- boolean fromServer = false;
@Released
EntryEventImpl event = null;
- @Retained
- Object result = null;
try {
- {
- if (this.srp != null) {
- VersionTagHolder holder = new VersionTagHolder();
- Object value = this.srp.get(key, aCallbackArgument, holder);
- fromServer = value != null;
- if (fromServer) {
- event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false, getMyId(),
- generateCallbacks);
- event.setVersionTag(holder.getVersionTag());
- event.setFromServer(fromServer); // fix for bug 39358
- if (clientEvent != null && clientEvent.getVersionTag() == null) {
- clientEvent.setVersionTag(holder.getVersionTag());
- }
+ boolean fromServer = false;
+ if (this.serverRegionProxy != null) {
+ VersionTagHolder holder = new VersionTagHolder();
+ Object value = this.serverRegionProxy.get(key, aCallbackArgument, holder);
+ fromServer = value != null;
+ if (fromServer) {
+ event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, false, getMyId(),
+ generateCallbacks);
+ event.setVersionTag(holder.getVersionTag());
+ event.setFromServer(true); // fix for bug 39358
+ if (clientEvent != null && clientEvent.getVersionTag() == null) {
+ clientEvent.setVersionTag(holder.getVersionTag());
}
}
}
+ long lastModified = 0L;
if (!fromServer) {
// Do not generate Event ID
event = EntryEventImpl.create(this, op, key, null /* newValue */, aCallbackArgument, false,
@@ -2315,7 +2146,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
try {
processor.initialize(this, key, aCallbackArgument);
// processor fills in event
- processor.doSearchAndLoad(event, txState, localValue);
+ processor.doSearchAndLoad(event, tx, localValue);
if (clientEvent != null && clientEvent.getVersionTag() == null) {
clientEvent.setVersionTag(event.getVersionTag());
}
@@ -2325,15 +2156,17 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
} else {
if (logger.isDebugEnabled()) {
- logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
- + getFullPath() + "; key=" + key);
+ logger.debug(
+ "DistributedRegion.findObjectInSystem skipping loader for region={}; key={}",
+ getFullPath(), key);
}
}
}
+ RegionEntry re = null;
if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
try {
// Set eventId. Required for interested clients.
- event.setNewEventId(cache.getDistributedSystem());
+ event.setNewEventId(this.cache.getDistributedSystem());
long startPut = CachePerfStats.getStatTime();
validateKey(key);
@@ -2345,17 +2178,17 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// set the tail key so that the event is passed to GatewaySender queues.
// if the tailKey is not set, the event gets filtered out in ParallelGatewaySenderQueue
if (this instanceof BucketRegion) {
- if (((BucketRegion) this).getPartitionedRegion().isParallelWanEnabled())
+ if (((Bucket) this).getPartitionedRegion().isParallelWanEnabled())
((BucketRegion) this).handleWANEvent(event);
}
re = basicPutEntry(event, lastModified);
- } catch (ConcurrentCacheModificationException e) {
+ } catch (ConcurrentCacheModificationException ignore) {
// the cache was modified while we were searching for this entry and
// the netsearch result was elided. Return the current value from the cache
re = getRegionEntry(key);
if (re != null) {
- event.setNewValue(re.getValue(this)); // OFFHEAP: need to incrc, copy to heap to
- // setNewValue, decrc
+ // OFFHEAP: need to incrc, copy to heap to setNewValue, decrc
+ event.setNewValue(re.getValue(this));
}
}
if (!isTX()) {
@@ -2371,6 +2204,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
recordMiss(re, key);
}
+ @Retained
+ Object result;
if (preferCD) {
result = event.getRawNewValueAsHeapObject();
} else {
@@ -2385,17 +2220,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
/**
- * hook for subclasses to note that a cache load was performed
- *
- * @see BucketRegion#performedLoad
- */
- // void performedLoad(EntryEventImpl event, long lastModifiedTime, TXState txState)
- // throws CacheWriterException {
- // // no action in DistributedRegion
- // }
-
- /**
- * @see LocalRegion#cacheWriteBeforeDestroy(EntryEventImpl, Object)
* @return true if cacheWrite was performed
*/
@Override
@@ -2430,9 +2254,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return result;
}
- /**
- * @see LocalRegion#cacheWriteBeforeRegionDestroy(RegionEventImpl)
- */
@Override
boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
throws CacheWriterException, TimeoutException {
@@ -2441,7 +2262,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
CacheWriter localWriter = basicGetWriter();
Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null;
- if (localWriter != null || (netWriteRecipients != null && !netWriteRecipients.isEmpty())) {
+ if (localWriter != null || netWriteRecipients != null && !netWriteRecipients.isEmpty()) {
final long start = getCachePerfStats().startCacheWriterCall();
try {
SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
@@ -2473,16 +2294,16 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- if (persistenceAdvisor != null) {
+ if (this.persistenceAdvisor != null) {
this.persistenceAdvisor.close(); // fix for bug 41094
}
this.distAdvisor.close();
- DLockService dls = null;
// Fix for bug 46338. Wait for in progress clears before destroying the
// lock service, because destroying the service immediately releases the dlock
waitForInProgressClear();
+ DLockService dls = null;
synchronized (this.dlockMonitor) {
if (this.dlockService != null) {
dls = (DLockService) this.dlockService;
@@ -2533,13 +2354,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
Set others = this.advisorListener.getInitialMembers();
CacheListener[] listeners = fetchCacheListenersField();
if (listeners != null) {
- for (int i = 0; i < listeners.length; i++) {
- if (listeners[i] instanceof RegionMembershipListener) {
- RegionMembershipListener rml = (RegionMembershipListener) listeners[i];
+ for (CacheListener listener : listeners) {
+ if (listener instanceof RegionMembershipListener) {
+ RegionMembershipListener regionMembershipListener = (RegionMembershipListener) listener;
try {
DistributedMember[] otherDms = new DistributedMember[others.size()];
others.toArray(otherDms);
- rml.initialMembers(this, otherDms);
+ regionMembershipListener.initialMembers(this, otherDms);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
@@ -2562,7 +2383,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
Set<String> allGatewaySenderIds = getAllGatewaySenderIds();
if (!allGatewaySenderIds.isEmpty()) {
- for (GatewaySender sender : cache.getAllGatewaySenders()) {
+ for (GatewaySender sender : this.cache.getAllGatewaySenders()) {
if (sender.isParallel() && allGatewaySenderIds.contains(sender.getId())) {
// Fix for Bug#51491. Once decided to support this configuration we have call
// addShadowPartitionedRegionForUserRR
@@ -2576,13 +2397,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
throw new GatewaySenderConfigurationException(
LocalizedStrings.ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1
.toLocalizedString(new Object[] {sender.getId(), this.getFullPath()}));
-
- // if (sender.isRunning()) {
- // ConcurrentParallelGatewaySenderQueue parallelQueue =
- // (ConcurrentParallelGatewaySenderQueue)((ParallelGatewaySenderImpl)sender)
- // .getQueues().toArray(new RegionQueue[1])[0];
- // parallelQueue.addShadowPartitionedRegionForUserRR(this);
- // }
}
}
}
@@ -2592,8 +2406,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
/**
* Free resources held by this region. This method is invoked after isDestroyed has been set to
* true.
- *
- * @see LocalRegion#postDestroyRegion(boolean, RegionEventImpl)
*/
@Override
protected void postDestroyRegion(boolean destroyDiskRegion, RegionEventImpl event) {
@@ -2605,7 +2417,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// I don't think this should ever happens: bulletproofing for bug 39454
logger.warn("postDestroyRegion: encountered cancellation", e);
}
-
}
@Override
@@ -2616,9 +2427,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
generateEventID());
distributeDestroyRegion(ev, true);
distributedRegionCleanup(null);
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// someone else must have concurrently destroyed the region (maybe a distributed destroy)
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// cache or DS is closed, ignore
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
@@ -2629,13 +2440,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
- /**
- * @see LocalRegion#handleCacheClose(Operation)
- */
@Override
- void handleCacheClose(Operation op) {
+ void handleCacheClose(Operation operation) {
try {
- super.handleCacheClose(op);
+ super.handleCacheClose(operation);
} finally {
distributedRegionCleanup(null);
}
@@ -2643,8 +2451,6 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
/**
* invoke a cache writer before a put is performed elsewhere
- *
- * @see LocalRegion#cacheWriteBeforePut(EntryEventImpl, Set, CacheWriter, boolean, Object)
*/
@Override
protected void cacheWriteBeforePut(EntryEventImpl event, Set netWriteRecipients,
@@ -2700,28 +2506,33 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
}
+ @Override
public void addGatewaySenderId(String gatewaySenderId) {
super.addGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
}
+ @Override
public void removeGatewaySenderId(String gatewaySenderId) {
super.removeGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
}
+ @Override
public void addAsyncEventQueueId(String asyncEventQueueId) {
super.addAsyncEventQueueId(asyncEventQueueId);
new UpdateAttributesProcessor(this).distribute();
}
+ @Override
public void removeAsyncEventQueueId(String asyncEventQueueId) {
super.removeAsyncEventQueueId(asyncEventQueueId);
new UpdateAttributesProcessor(this).distribute();
}
+ @Override
public void checkSameSenderIdsAvailableOnAllNodes() {
- List senderIds =
+ List<Set<String>> senderIds =
this.getCacheDistributionAdvisor().adviseSameGatewaySenderIds(getGatewaySenderIds());
if (!senderIds.isEmpty()) {
throw new GatewaySenderConfigurationException(
@@ -2730,7 +2541,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
new Object[] {this.getName(), senderIds.get(0), senderIds.get(1)}));
}
- List asycnQueueIds = this.getCacheDistributionAdvisor()
+ List<Set<String>> asycnQueueIds = this.getCacheDistributionAdvisor()
.adviseSameAsyncEventQueueIds(getVisibleAsyncEventQueueIds());
if (!asycnQueueIds.isEmpty()) {
throw new GatewaySenderConfigurationException(
@@ -2778,8 +2589,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (!dlock.tryLock(timeLeft, TimeUnit.SECONDS)) {
msg =
LocalizedStrings.DistributedRegion_ATTEMPT_TO_ACQUIRE_DISTRIBUTED_LOCK_FOR_0_FAILED_AFTER_WAITING_1_SECONDS;
- msgArgs =
- new Object[] {key, Long.valueOf((System.currentTimeMillis() - start) / 1000L)};
+ msgArgs = new Object[] {key, (System.currentTimeMillis() - start) / 1000L};
break;
}
@@ -2787,9 +2597,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
} catch (InterruptedException ex) {
interrupted = true;
this.cache.getCancelCriterion().checkCancelInProgress(ex);
- // FIXME Why is it OK to keep going?
+ // TODO: Why is it OK to keep going?
if (lockTimeout > -1) {
- timeLeft = getCache().getLockTimeout() - ((System.currentTimeMillis() - start) / 1000L);
+ timeLeft = getCache().getLockTimeout() - (System.currentTimeMillis() - start) / 1000L;
}
} finally {
if (interrupted) {
@@ -2800,7 +2610,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (msg == null) {
msg =
LocalizedStrings.DistributedRegion_TIMED_OUT_AFTER_WAITING_0_SECONDS_FOR_THE_DISTRIBUTED_LOCK_FOR_1;
- msgArgs = new Object[] {Integer.valueOf(getCache().getLockTimeout()), key};
+ msgArgs = new Object[] {getCache().getLockTimeout(), key};
}
throw new TimeoutException(msg.toLocalizedString(msgArgs));
} else {
@@ -2812,10 +2622,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* Checks if the entry is a valid entry
*
* @return true if entry not null or entry is not removed
- *
*/
protected boolean checkEntryNotValid(RegionEntry mapEntry) {
- return (mapEntry == null || (mapEntry.isRemoved() && !mapEntry.isTombstone()));
+ return mapEntry == null || mapEntry.isRemoved() && !mapEntry.isTombstone();
}
/**
@@ -2823,6 +2632,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* an iterator that uses hash ordering from the entry map, or, in the case of an overflow region,
* an iterator that iterates over the entries in disk order.
*/
+ @Override
public Iterator<RegionEntry> getBestIterator(boolean includeValues) {
DiskRegion dr = this.getDiskRegion();
@@ -2830,20 +2640,12 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
// Wait for the disk region to recover values first.
dr.waitForAsyncRecovery();
if (dr.getNumOverflowOnDisk() > 0) {
- return new DiskSavyIterator();
+ return new DiskSavvyIterator();
}
}
return this.entries.regionEntries().iterator();
}
- // /**
- // * The maximum number of entries that can be put into the diskMap before
- // * some of them are read from disk and returned by this iterator.
- // * The larger this number the more memory this iterator is allowed to consume
- // * and the better it will do in optimally reading the pending entries.
- // */
- // static final long MAX_PENDING_ENTRIES = Long.getLong("gemfire.MAX_PENDING_ENTRIES",
- // 1000000).longValue();
/**
* Should only be used if this region has entries on disk that are not in memory. This currently
* happens for overflow and for recovery when values are not recovered. The first iteration does a
@@ -2851,26 +2653,19 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* it saves it in a list sorted by the location on disk. Once the regionEntries iterator has
* nothing more to iterate it starts iterating over, in disk order, the entries on disk.
*/
- private class DiskSavyIterator implements Iterator<RegionEntry> {
+ private class DiskSavvyIterator implements Iterator<RegionEntry> {
private boolean usingIt = true;
+
private Iterator<?> it = entries.regionEntries().iterator();
+
// iterator for nested ArrayLists
private Iterator<RegionEntry> subIt = null;
- // private final ArrayList<DiskPosition> diskList = new ArrayList<DiskPosition>(/*@todo presize
- // based on number of entries only on disk*/);
- // value will be either RegionEntry or an ArrayList<RegionEntry>
- // private long pendingCount = 0;
- private final java.util.TreeMap<DiskPage, Object> diskMap =
- new java.util.TreeMap<DiskPage, Object>();
- // /**
- // * used to iterate over the fullest pages at the time we have
- // * added MAX_PENDING_ENTRIES to diskMap;
- // */
- // private Iterator<Map.Entry<DiskPage, Object>> sortedDiskIt;
+ private final TreeMap<DiskPage, Object> diskMap = new TreeMap<>();
- public DiskSavyIterator() {}
+ DiskSavvyIterator() {}
+ @Override
public boolean hasNext() {
boolean result;
if (this.subIt != null) {
@@ -2881,128 +2676,68 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return result;
}
}
- // if (this.sortedDiskIt != null) {
- // result = this.sortedDiskIt.hasNext();
- // if (!result) {
- // this.sortedDiskIt = null;
- // } else {
- // return result;
- // }
- // }
+
result = this.it.hasNext();
if (this.usingIt && !result) {
this.usingIt = false;
- // long start = System.currentTimeMillis();
- // Collections.sort(this.diskList);
- // long end = System.currentTimeMillis();
this.it = this.diskMap.values().iterator();
result = this.it.hasNext();
}
return result;
}
+ @Override
public RegionEntry next() {
for (;;) {
if (this.subIt != null) {
return this.subIt.next();
- // } else if (this.sortedDiskIt != null) {
- // Map.Entry<DiskPage, Object> me = this.sortedDiskIt.next();
- // // remove the page from the diskMap.
- // this.diskMap.remove(me.getKey());
- // Object v = me.getValue();
- // int size = 1;
- // if (v instanceof ArrayList) {
- // ArrayList al = (ArrayList)v;
- // size = al.size();
- // // set up the iterator to start returning the entries on that page
- // this.subIt = al.iterator();
- // v = this.subIt.next();
- // }
-
- // // decrement pendingCount by the number of entries on the page
- // this.pendingCount -= size;
- // // return the first region entry on this page
- // return v;
}
if (this.usingIt) {
- RegionEntry re = (RegionEntry) this.it.next();
- DiskPosition dp = new DiskPosition();
- if (re.isOverflowedToDisk(DistributedRegion.this, dp)) {
- // add dp to sorted list
- DiskPage dPage = new DiskPage(dp);
- Object v = this.diskMap.get(dPage);
- if (v == null) {
- this.diskMap.put(dPage, re);
- } else if (v instanceof ArrayList) {
- ArrayList al = (ArrayList) v;
- al.add(re);
+ RegionEntry regionEntry = (RegionEntry) this.it.next();
+ DiskPosition diskPosition = new DiskPosition();
+ if (regionEntry.isOverflowedToDisk(DistributedRegion.this, diskPosition)) {
+ // add diskPosition to sorted list
+ DiskPage dPage = new DiskPage(diskPosition);
+ Object value = this.diskMap.get(dPage);
+ if (value == null) {
+ this.diskMap.put(dPage, regionEntry);
+ } else if (value instanceof ArrayList) {
+ List list = (List) value;
+ list.add(regionEntry);
} else {
- ArrayList al = new ArrayList();
- al.add(v);
- al.add(re);
- this.diskMap.put(dPage, al);
+ List list = new ArrayList();
+ list.add(value);
+ list.add(regionEntry);
+ this.diskMap.put(dPage, list);
}
if (!hasNext()) {
assert false; // must be true
}
- // this.pendingCount++;
- // if (this.usingIt && this.pendingCount >= MAX_PENDING_ENTRIES) {
- // // find the pages that have the most entries
- // int largestPage = 1;
- // ArrayList<Map.Entry<DiskPage, Object>> largestPages
- // = new ArrayList<Map.Entry<DiskPage, Object>>();
- // for (Map.Entry<DiskPage, Object> me: this.diskMap.entrySet()) {
- // int meSize = 1;
- // if (me.getValue() instanceof ArrayList) {
- // meSize = ((ArrayList)me.getValue()).size();
- // }
- // if (meSize > largestPage) {
- // largestPage = meSize;
- // largestPages.clear(); // throw away smaller pages
- // largestPages.add(me);
- // } else if (meSize == largestPage) {
- // largestPages.add(me);
- // } else {
- // // ignore this page
- // }
- // }
- // Collections.sort(largestPages, new Comparator
- // <Map.Entry<DiskPage, Object>>() {
- // /**
- // * Note: this comparator imposes orderings that are inconsistent
- // * with equals.
- // */
- // public int compare(Map.Entry<DiskPage, Object> o1, Map.Entry<DiskPage, Object> o2) {
- // return o1.getKey().compareTo(o2.getKey());
- // }
- // });
- // this.sortedDiskIt = largestPages.iterator();
- // // loop around and fetch first value from sortedDiskIt
- // }
} else {
- return re;
+ return regionEntry;
}
} else {
- Object v = this.it.next();
- if (v instanceof ArrayList) {
- ArrayList al = (ArrayList) v;
- this.subIt = al.iterator();
+ Object value = this.it.next();
+ if (value instanceof ArrayList) {
+ List list = (List) value;
+ this.subIt = list.iterator();
return this.subIt.next();
} else {
- return (RegionEntry) v;
+ return (RegionEntry) value;
}
}
}
}
+ @Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public static class DiskPosition implements Comparable<DiskPosition> {
- private long oplogId;
- private long offset;
+ long oplogId; // package-private to avoid synthetic accessor
+ long offset; // package-private to avoid synthetic accessor
DiskPosition() {}
@@ -3013,19 +2748,21 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
@Override
public int hashCode() {
+ // TODO: Object instantiation inside 'hashCode()' is bad
return Long.valueOf(this.oplogId ^ this.offset).hashCode();
}
@Override
- public boolean equals(Object o) {
- if (o instanceof DiskPosition) {
- DiskPosition other = (DiskPosition) o;
+ public boolean equals(Object obj) {
+ if (obj instanceof DiskPosition) {
+ DiskPosition other = (DiskPosition) obj;
return this.oplogId == other.oplogId && this.offset == other.offset;
} else {
return false;
}
}
+ @Override
public int compareTo(DiskPosition o) {
int result = Long.signum(this.oplogId - o.oplogId);
if (result == 0) {
@@ -3036,18 +2773,19 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("<").append(this.oplogId).append(":").append(this.offset).append(">");
+ StringBuilder sb = new StringBuilder();
+ sb.append('<').append(this.oplogId).append(':').append(this.offset).append('>');
return sb.toString();
}
}
+
static class DiskPage extends DiskPosition {
static final long DISK_PAGE_SIZE =
- Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "DISK_PAGE_SIZE", 8 * 1024L).longValue();
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "DISK_PAGE_SIZE", 8 << 10);
- DiskPage(DiskPosition dp) {
- this.setPosition(dp.oplogId, dp.offset / DISK_PAGE_SIZE);
+ DiskPage(DiskPosition diskPosition) {
+ this.setPosition(diskPosition.oplogId, diskPosition.offset / DISK_PAGE_SIZE);
}
}
@@ -3055,7 +2793,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* Returns the lock lease value to use for DistributedLock and RegionDistributedLock. -1 is
* supported as non-expiring lock.
*/
- protected long getLockLeaseForLock() {
+ long getLockLeaseForLock() { // package-private to avoid synthetic accessor
if (getCache().getLockLease() == -1) {
return -1;
}
@@ -3066,24 +2804,22 @@ public class DistributedReg
<TRUNCATED>