You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/08/17 18:17:42 UTC
[geode] 10/18: GEODE-6588: Cleanup AbstractUpdateOperation
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7e0888b2542bc027ec6889d007725b0dcbe029f7
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Thu May 20 15:43:21 2021 -0700
GEODE-6588: Cleanup AbstractUpdateOperation
---
.../internal/cache/AbstractUpdateOperation.java | 57 +--
.../internal/cache/CacheDistributionAdvisor.java | 34 +-
.../internal/cache/DistributedCacheOperation.java | 387 ++++++++++-----------
.../cache/DistributedCacheOperationTest.java | 11 +-
4 files changed, 224 insertions(+), 265 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 69bfa39..32df318 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -33,7 +33,6 @@ import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
-import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.VersionTag;
@@ -57,13 +56,13 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
private final long lastModifiedTime;
- public AbstractUpdateOperation(CacheEvent event, long lastModifiedTime) {
+ public AbstractUpdateOperation(CacheEvent<?, ?> event, long lastModifiedTime) {
super(event);
this.lastModifiedTime = lastModifiedTime;
}
@Override
- protected Set getRecipients() {
+ protected Set<InternalDistributedMember> getRecipients() {
CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor();
return advisor.adviseUpdate(getEvent());
}
@@ -72,11 +71,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
protected void initMessage(CacheOperationMessage msg, DirectReplyProcessor pr) {
super.initMessage(msg, pr);
AbstractUpdateMessage m = (AbstractUpdateMessage) msg;
- DistributedRegion region = getRegion();
- DistributionManager mgr = region.getDistributionManager();
- // [bruce] We might have to stop using cacheTimeMillis because it causes a skew between
- // lastModified and the version tag's timestamp
- m.lastModified = this.lastModifiedTime;
+ m.lastModified = lastModifiedTime;
}
private static final boolean ALWAYS_REPLICATE_UPDATES =
@@ -85,19 +80,12 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
/** @return whether we should do a local create for a remote one */
private static boolean shouldDoRemoteCreate(LocalRegion rgn, EntryEventImpl ev) {
DataPolicy dp = rgn.getAttributes().getDataPolicy();
- if (!rgn.isAllEvents() || (dp.withReplication() && rgn.isInitialized()
- && ev.getOperation().isUpdate() && !rgn.getConcurrencyChecksEnabled()
- // misordered CREATE and
- // UPDATE messages can
- // cause inconsistencies
- && !ALWAYS_REPLICATE_UPDATES)) {
- // we are not accepting all events
- // or we are a replicate and initialized and it was an update
- // (we exclude that latter to avoid resurrecting a key deleted in a replicate
- return false;
- } else {
- return true;
- }
+ // we are not accepting all events or we are a replicate and initialized and it was an update
+ // (we exclude that latter to avoid resurrecting a key deleted in a replicate
+ // misordered CREATE and UPDATE messages can cause inconsistencies
+ return rgn.isAllEvents() && (!dp.withReplication() || !rgn.isInitialized()
+ || !ev.getOperation().isUpdate() || rgn.getConcurrencyChecksEnabled()
+ || ALWAYS_REPLICATE_UPDATES);
}
private static boolean checkIfToUpdateAfterCreateFailed(LocalRegion rgn, EntryEventImpl ev) {
@@ -200,7 +188,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
if (logger.isTraceEnabled()) {
logger.trace("Processing put key {} in region {}", ev.getKey(), rgn.getFullPath());
}
- updated = true;
} else {
// key not here, blocked by DESTROYED token or ConcurrentCacheModificationException
// thrown during second update attempt
@@ -211,7 +198,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
rgn.basicUpdate(ev, false, false, lastMod, true, invokeCallbacks, false);
if (thirdBasicUpdateSuccess) {
rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
- updated = true;
}
} else {
if (rgn.getVersionVector() != null && ev.getVersionTag() != null) {
@@ -264,25 +250,20 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
protected long lastModified;
@Override
- protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm)
+ protected boolean operateOnRegion(CacheEvent<?, ?> event, ClusterDistributionManager dm)
throws EntryNotFoundException {
EntryEventImpl ev = (EntryEventImpl) event;
DistributedRegion rgn = (DistributedRegion) ev.getRegion();
- DistributionManager mgr = dm;
- boolean sendReply = true; // by default tell caller to send ack
- // if (!rgn.hasSeenEvent((InternalCacheEvent)event)) {
if (!rgn.isCacheContentProxy()) {
basicOperateOnRegion(ev, rgn);
- }
- // }
- else {
+ } else {
if (logger.isDebugEnabled()) {
logger.debug("UpdateMessage: this cache has already seen this event {}", event);
}
}
- return sendReply;
+ return true; // tell caller to send ack
}
// @todo darrel: make this method static?
@@ -297,15 +278,15 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
logger.debug("Processing {}", this);
}
try {
- long time = this.lastModified;
+ long time = lastModified;
if (ev.getVersionTag() != null) {
checkVersionTag(rgn, ev.getVersionTag());
time = ev.getVersionTag().getVersionTimeStamp();
}
- this.appliedOperation = doPutOrCreate(rgn, ev, time);
+ appliedOperation = doPutOrCreate(rgn, ev, time);
} catch (ConcurrentCacheModificationException e) {
dispatchElidedEvent(rgn, ev);
- this.appliedOperation = false;
+ appliedOperation = false;
}
}
@@ -313,25 +294,25 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; lastModified=");
- buff.append(this.lastModified);
+ buff.append(lastModified);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.lastModified = in.readLong();
+ lastModified = in.readLong();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
- out.writeLong(this.lastModified);
+ out.writeLong(lastModified);
}
protected void checkVersionTag(DistributedRegion rgn, VersionTag tag) {
- RegionAttributes attr = rgn.getAttributes();
+ RegionAttributes<?, ?> attr = rgn.getAttributes();
if (attr.getConcurrencyChecksEnabled() && attr.getDataPolicy().withPersistence()
&& attr.getScope() != Scope.GLOBAL
&& (tag.getMemberID() == null || test_InvalidVersion)) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 5879cb6..3d49278 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -188,7 +188,8 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* Provide recipient information for an update or create operation.
*
*/
- Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
+ Set<InternalDistributedMember> adviseUpdate(final EntryEventImpl event)
+ throws IllegalStateException {
if (event.hasNewValue() || event.getOperation().isPutAll()) {
// only need to distribute it to members that want all events or cache data
return adviseAllEventsOrCached();
@@ -246,7 +247,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
});
} else {
StringBuilder badIds = new StringBuilder();
- Iterator biI = badList.iterator();
+ Iterator<InternalDistributedMember> biI = badList.iterator();
while (biI.hasNext()) {
badIds.append(biI.next().toString());
if (biI.hasNext()) {
@@ -254,8 +255,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
}
}
throw new IllegalStateException(
- String.format("Illegal Region Configuration for members: %s",
- badIds.toString()));
+ String.format("Illegal Region Configuration for members: %s", badIds));
}
}
@@ -265,7 +265,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* @return Set of Serializable members that have a CacheLoader installed; no reference to Set kept
* by advisor so caller is free to modify it
*/
- public Set adviseNetLoad() {
+ public Set<InternalDistributedMember> adviseNetLoad() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
@@ -279,7 +279,8 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
});
}
- public FilterRoutingInfo adviseFilterRouting(CacheEvent event, Set cacheOpRecipients) {
+ public FilterRoutingInfo adviseFilterRouting(CacheEvent<?, ?> event,
+ Set<InternalDistributedMember> cacheOpRecipients) {
FilterProfile fp = ((LocalRegion) event.getRegion()).getFilterProfile();
if (fp != null) {
return fp.getFilterRoutingInfoPart1(event, profiles, cacheOpRecipients);
@@ -310,7 +311,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
/**
* Same as adviseGeneric
*/
- public Set adviseDestroyRegion() {
+ public Set<InternalDistributedMember> adviseDestroyRegion() {
return adviseGeneric();
}
@@ -320,7 +321,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* @return Set of Serializable member ids that have a CacheWriter installed; no reference to Set
* kept by advisor so caller is free to modify it
*/
- public Set adviseNetWrite() {
+ public Set<InternalDistributedMember> adviseNetWrite() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
@@ -347,7 +348,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* @return Set of Serializable member ids that have the region and are have storage (no need to
* search an empty cache)
*/
- Set adviseNetSearch() {
+ Set<InternalDistributedMember> adviseNetSearch() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
@@ -452,7 +453,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
*
* @since GemFire 5.5
*/
- Set adviseRequiresOldValueInCacheOp() {
+ Set<InternalDistributedMember> adviseRequiresOldValueInCacheOp() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
@@ -675,11 +676,11 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
public boolean getInRecovery() {
return inRecovery;
- };
+ }
public void setInRecovery(boolean recovery) {
inRecovery = recovery;
- };
+ }
public DataPolicy getDataPolicy() {
return dataPolicy;
@@ -743,9 +744,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* Return true if cached or allEvents and a listener
*/
boolean cachedOrAllEventsWithListener() {
- // to fix bug 36804 to ignore hasCacheListener
- // return this.dataPolicy.withStorage() ||
- // (allEvents() && this.hasCacheListener);
return cachedOrAllEvents();
}
@@ -1038,7 +1036,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* @return the set of preloaded's memberIds
* @since GemFire prPersistSprint1
*/
- public Set advisePreloadeds() {
+ public Set<InternalDistributedMember> advisePreloadeds() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
@@ -1052,7 +1050,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
* @return the set of replicate's memberIds
* @since GemFire 5.8
*/
- Set adviseEmptys() {
+ Set<InternalDistributedMember> adviseEmptys() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
@@ -1106,7 +1104,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
return result;
}
- Set adviseCacheServers() {
+ Set<InternalDistributedMember> adviseCacheServers() {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 4a53ef6..e32a2af 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.Vector;
import org.apache.logging.log4j.Logger;
@@ -138,11 +137,6 @@ public abstract class DistributedCacheOperation {
}
}
- // static values for oldValueIsObject
- public static final byte VALUE_IS_BYTES = 0;
- public static final byte VALUE_IS_SERIALIZED_OBJECT = 1;
- public static final byte VALUE_IS_OBJECT = 2;
-
/**
* Given a VALUE_IS_* constant convert and return the corresponding DESERIALIZATION_POLICY_*.
*/
@@ -169,9 +163,9 @@ public abstract class DistributedCacheOperation {
protected CacheOperationReplyProcessor processor = null;
- protected Set departedMembers;
+ protected Set<InternalDistributedMember> departedMembers;
- protected Set originalRecipients;
+ protected Set<InternalDistributedMember> originalRecipients;
@MutableForTesting
static Runnable internalBeforePutOutgoing;
@@ -188,7 +182,7 @@ public abstract class DistributedCacheOperation {
}
/** Creates a new instance of DistributedCacheOperation */
- public DistributedCacheOperation(CacheEvent event) {
+ public DistributedCacheOperation(CacheEvent<?, ?> event) {
this.event = (InternalCacheEvent) event;
}
@@ -200,24 +194,19 @@ public abstract class DistributedCacheOperation {
* @since GemFire 5.0
*/
boolean isOperationReliable() {
- Operation op = this.event.getOperation();
+ Operation op = event.getOperation();
if (!op.isRegionDestroy()) {
return true;
}
- if (op.isDistributed()) {
- return true;
- }
// must be a region destroy that is "local" which means
// Region.localDestroyRegion or Region.close or Cache.clsoe
// none of these should do reliability checks
- return false;
+ return op.isDistributed();
}
public boolean supportsDirectAck() {
// force use of shared connection if we're already in a secondary
- // thread-owned reader thread. See bug #49565. Also see Connection#processNIOBuffer
- // int dominoCount = org.apache.geode.internal.tcp.Connection.getDominoCount();
- // return dominoCount < 2;
+ // thread-owned reader thread. See Connection#processNIOBuffer
return true;
}
@@ -267,7 +256,7 @@ public abstract class DistributedCacheOperation {
DistributedRegion region = getRegion();
long viewVersion = -1;
try {
- if (this.containsRegionContentChange()) {
+ if (containsRegionContentChange()) {
viewVersion = region.getDistributionAdvisor().startOperation();
}
if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
@@ -354,13 +343,13 @@ public abstract class DistributedCacheOperation {
}
// some members requiring old value are also in the cache op recipients set
- Set needsOldValueInCacheOp = Collections.emptySet();
+ Set<InternalDistributedMember> needsOldValueInCacheOp = Collections.emptySet();
// set client routing information into the event
boolean routingComputed = false;
FilterRoutingInfo filterRouting = null;
// recipients that will get a cacheop msg and also a PR message
- Set twoMessages = Collections.emptySet();
+ Set<InternalDistributedMember> twoMessages = Collections.emptySet();
if (region.isUsedForPartitionedRegionBucket()) {
twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
routingComputed = true;
@@ -374,12 +363,12 @@ public abstract class DistributedCacheOperation {
// some members need PR notification of the change for client/wan
// notification
- Set adjunctRecipients = Collections.emptySet();
+ Set<InternalDistributedMember> adjunctRecipients = Collections.emptySet();
// Partitioned region listener notification messages piggyback on this
// operation's replyprocessor and need to be sent at the same time as
// the operation's message
- if (this.supportsAdjunctMessaging() && region.isUsedForPartitionedRegionBucket()) {
+ if (supportsAdjunctMessaging() && region.isUsedForPartitionedRegionBucket()) {
BucketRegion br = (BucketRegion) region;
adjunctRecipients = getAdjunctReceivers(br, recipients, twoMessages, filterRouting);
}
@@ -388,7 +377,7 @@ public abstract class DistributedCacheOperation {
if (entryEvent != null && entryEvent.hasOldValue()) {
if (testSendingOldValues) {
- needsOldValueInCacheOp = new HashSet(recipients);
+ needsOldValueInCacheOp = new HashSet<>(recipients);
} else {
needsOldValueInCacheOp =
region.getCacheDistributionAdvisor().adviseRequiresOldValueInCacheOp();
@@ -396,14 +385,14 @@ public abstract class DistributedCacheOperation {
recipients.removeAll(needsOldValueInCacheOp);
}
- Set cachelessNodes = Collections.emptySet();
- Set adviseCacheServers;
+ Set<InternalDistributedMember> cachelessNodes = Collections.emptySet();
+ Set<InternalDistributedMember> adviseCacheServers;
Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = Collections.emptySet();
- if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
+ if (region.getDistributionConfig().getDeltaPropagation() && supportsDeltaPropagation()) {
cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
if (!cachelessNodes.isEmpty()) {
- List list = new ArrayList(cachelessNodes);
- for (Object member : cachelessNodes) {
+ List<InternalDistributedMember> list = new ArrayList<>(cachelessNodes);
+ for (InternalDistributedMember member : cachelessNodes) {
if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
// Don't include those originally excluded.
list.remove(member);
@@ -437,26 +426,24 @@ public abstract class DistributedCacheOperation {
}
}
}
- if (!reliableOp || region.isNoDistributionOk()) {
- // nothing needs be done in this case
- } else {
+ if (reliableOp && !region.isNoDistributionOk()) {
region.handleReliableDistribution(Collections.emptySet());
}
// compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
- this.event.setLocalFilterInfo(filterInfo);
+ event.setLocalFilterInfo(filterInfo);
}
} else {
boolean directAck = false;
boolean useMulticast = region.getMulticastEnabled()
- && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
+ && region.getSystem().getConfig().getMcastPort() != 0 && supportsMulticast();
boolean shouldAck = shouldAck();
if (shouldAck) {
- if (this.supportsDirectAck() && adjunctRecipients.isEmpty()) {
+ if (supportsDirectAck() && adjunctRecipients.isEmpty()) {
if (region.getSystem().threadOwnsResources()) {
directAck = true;
}
@@ -481,28 +468,25 @@ public abstract class DistributedCacheOperation {
if (shouldAck) {
// adjunct messages are sent using the same reply processor, so
// add them to the processor's membership set
- Collection waitForMembers = null;
- if (recipients.size() > 0 && adjunctRecipients.size() == 0 && cachelessNodes.isEmpty()) { // the
- // common
- // case
+ final Collection<InternalDistributedMember> waitForMembers;
+ if (recipients.size() > 0 && adjunctRecipients.size() == 0 && cachelessNodes.isEmpty()) {
+ // the common case
waitForMembers = recipients;
} else if (!cachelessNodes.isEmpty()) {
- waitForMembers = new HashSet(recipients);
+ waitForMembers = new HashSet<>(recipients);
waitForMembers.addAll(cachelessNodes);
} else {
- // note that we use a Vector instead of a Set for the responders
- // collection
- // because partitioned regions sometimes send both a regular cache
- // operation and a partitioned-region notification message to the
- // same recipient
- waitForMembers = new Vector(recipients);
+ // note that we use a List instead of a Set for the responders
+ // collection because partitioned regions sometimes send both a regular cache
+ // operation and a partitioned-region notification message to the same recipient
+ waitForMembers = new ArrayList<>(recipients);
waitForMembers.addAll(adjunctRecipients);
waitForMembers.addAll(needsOldValueInCacheOp);
waitForMembers.addAll(cachelessNodes);
}
if (DistributedCacheOperation.LOSS_SIMULATION_RATIO != 0.0) {
if (LOSS_SIMULATION_GENERATOR == null) {
- LOSS_SIMULATION_GENERATOR = new Random(this.hashCode());
+ LOSS_SIMULATION_GENERATOR = new Random(hashCode());
}
if ((LOSS_SIMULATION_GENERATOR.nextInt(100) * 1.0 / 100.0) < LOSS_SIMULATION_RATIO) {
if (logger.isDebugEnabled()) {
@@ -514,24 +498,23 @@ public abstract class DistributedCacheOperation {
}
}
if (reliableOp) {
- this.departedMembers = new HashSet();
- this.processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers,
- this.departedMembers);
+ departedMembers = new HashSet<>();
+ processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers,
+ departedMembers);
} else {
- this.processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers);
+ processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers);
}
}
- Set failures = null;
CacheOperationMessage msg = createMessage();
- initMessage(msg, this.processor);
+ initMessage(msg, processor);
if (DistributedCacheOperation.internalBeforePutOutgoing != null) {
DistributedCacheOperation.internalBeforePutOutgoing.run();
}
if (processor != null && msg.isSevereAlertCompatible()) {
- this.processor.enableSevereAlertProcessing();
+ processor.enableSevereAlertProcessing();
// if this message is distributing for a partitioned region message,
// we can't wait as long as the full ack-severe-alert-threshold or
// the sender might kick us out of the system before we can get an ack
@@ -570,14 +553,14 @@ public abstract class DistributedCacheOperation {
}
msg.setRecipients(recipients);
- failures = mgr.putOutgoing(msg);
+ Set<InternalDistributedMember> failures = mgr.putOutgoing(msg);
// distribute to members needing the old value now
if (needsOldValueInCacheOp.size() > 0) {
- msg.appendOldValueToMessage((EntryEventImpl) this.event);
+ msg.appendOldValueToMessage((EntryEventImpl) event);
msg.resetRecipients();
msg.setRecipients(needsOldValueInCacheOp);
- Set newFailures = mgr.putOutgoing(msg);
+ Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg);
if (newFailures != null) {
if (logger.isDebugEnabled()) {
logger.debug("Failed sending ({}) to {}", msg, newFailures);
@@ -596,7 +579,7 @@ public abstract class DistributedCacheOperation {
msg.resetRecipients();
msg.setRecipients(cachelessNodes);
msg.setSendDelta(false);
- Set newFailures = mgr.putOutgoing(msg);
+ Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg);
if (newFailures != null) {
if (failures != null && failures.size() > 0) {
failures.addAll(newFailures);
@@ -611,7 +594,7 @@ public abstract class DistributedCacheOperation {
msg.setRecipients(cachelessNodesWithNoCacheServer);
msg.setSendDelta(false);
((UpdateMessage) msg).setSendDeltaWithFullValue(false);
- Set newFailures = mgr.putOutgoing(msg);
+ Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg);
if (newFailures != null) {
if (failures != null && failures.size() > 0) {
failures.addAll(newFailures);
@@ -628,7 +611,6 @@ public abstract class DistributedCacheOperation {
logger.debug("Failed sending ({}) to {} while processing event:{}", msg, failures, event);
}
- Set<InternalDistributedMember> adjunctRecipientsWithNoCacheServer = Collections.emptySet();
// send partitioned region listener notification messages now
if (!adjunctRecipients.isEmpty()) {
if (cachelessNodes.size() > 0) {
@@ -640,29 +622,31 @@ public abstract class DistributedCacheOperation {
recipients.addAll(cachelessNodes);
}
}
- adjunctRecipientsWithNoCacheServer = new HashSet<>(adjunctRecipients);
+
+ final Set<InternalDistributedMember> adjunctRecipientsWithNoCacheServer =
+ new HashSet<>(adjunctRecipients);
adviseCacheServers = ((Bucket) region).getPartitionedRegion()
.getCacheDistributionAdvisor().adviseCacheServers();
adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
if (isPutAll) {
((BucketRegion) region).performPutAllAdjunctMessaging((DistributedPutAllOperation) this,
- recipients, adjunctRecipients, filterRouting, this.processor);
+ recipients, adjunctRecipients, filterRouting, processor);
} else if (isRemoveAll) {
((BucketRegion) region).performRemoveAllAdjunctMessaging(
(DistributedRemoveAllOperation) this, recipients, adjunctRecipients, filterRouting,
- this.processor);
+ processor);
} else {
boolean calculateDelta =
adjunctRecipientsWithNoCacheServer.size() < adjunctRecipients.size();
adjunctRecipients.removeAll(adjunctRecipientsWithNoCacheServer);
if (!adjunctRecipients.isEmpty()) {
((BucketRegion) region).performAdjunctMessaging(getEvent(), recipients,
- adjunctRecipients, filterRouting, this.processor, calculateDelta, true);
+ adjunctRecipients, filterRouting, processor, calculateDelta, true);
}
if (!adjunctRecipientsWithNoCacheServer.isEmpty()) {
((BucketRegion) region).performAdjunctMessaging(getEvent(), recipients,
- adjunctRecipientsWithNoCacheServer, filterRouting, this.processor, calculateDelta,
+ adjunctRecipientsWithNoCacheServer, filterRouting, processor, calculateDelta,
false);
}
}
@@ -674,10 +658,10 @@ public abstract class DistributedCacheOperation {
event.setLocalFilterInfo(filterInfo);
}
- waitForAckIfNeeded(msg, persistentIds);
+ waitForAckIfNeeded(persistentIds);
if (/* msg != null && */reliableOp) {
- Set successfulRecips = new HashSet(recipients);
+ Set<InternalDistributedMember> successfulRecips = new HashSet<>(recipients);
successfulRecips.addAll(cachelessNodes);
successfulRecips.addAll(needsOldValueInCacheOp);
if (failures != null && !failures.isEmpty()) {
@@ -747,8 +731,8 @@ public abstract class DistributedCacheOperation {
// the entry form CQ cache.
if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
&& e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)
- && ((EntryOperation) event).getKey() != null) {
- cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
+ && ((EntryOperation<?, ?>) event).getKey() != null) {
+ cq.removeFromCqResultKeys(((EntryOperation<?, ?>) event).getKey(), true);
}
}
}
@@ -762,9 +746,10 @@ public abstract class DistributedCacheOperation {
* messages
* @param routing client routing information
*/
- Set getAdjunctReceivers(BucketRegion br, Set cacheOpReceivers, Set twoMessages,
+ Set<InternalDistributedMember> getAdjunctReceivers(BucketRegion br,
+ Set<InternalDistributedMember> cacheOpReceivers, Set<InternalDistributedMember> twoMessages,
FilterRoutingInfo routing) {
- return br.getAdjunctReceivers(this.getEvent(), cacheOpReceivers, twoMessages, routing);
+ return br.getAdjunctReceivers(getEvent(), cacheOpReceivers, twoMessages, routing);
}
/**
@@ -774,16 +759,16 @@ public abstract class DistributedCacheOperation {
// nothing to do here - see UpdateMessage
}
- protected void waitForAckIfNeeded(CacheOperationMessage msg,
+ protected void waitForAckIfNeeded(
Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
- if (this.processor == null) {
+ if (processor == null) {
return;
}
try {
// keep waiting even if interrupted
try {
- this.processor.waitForRepliesUninterruptibly();
- Set<InternalDistributedMember> closedMembers = this.processor.closedMembers.getSnapshot();
+ processor.waitForRepliesUninterruptibly();
+ Set<InternalDistributedMember> closedMembers = processor.closedMembers.getSnapshot();
handleClosedMembers(closedMembers, persistentIds);
} catch (ReplyException e) {
if (this instanceof DestroyRegionOperation) {
@@ -792,7 +777,7 @@ public abstract class DistributedCacheOperation {
e.handleCause();
}
} finally {
- this.processor = null;
+ processor = null;
}
}
@@ -828,27 +813,28 @@ public abstract class DistributedCacheOperation {
}
protected DistributedRegion getRegion() {
- return (DistributedRegion) this.event.getRegion();
+ return (DistributedRegion) event.getRegion();
}
protected EntryEventImpl getEvent() {
- return (EntryEventImpl) this.event;
+ return (EntryEventImpl) event;
}
- protected Set getRecipients() {
+ protected Set<InternalDistributedMember> getRecipients() {
CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor();
- this.originalRecipients = advisor.adviseCacheOp();
- return this.originalRecipients;
+ originalRecipients = advisor.adviseCacheOp();
+ return originalRecipients;
}
- protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) {
+ protected FilterRoutingInfo getRecipientFilterRouting(
+ Set<InternalDistributedMember> cacheOpRecipients) {
LocalRegion region = getRegion();
if (!region.isUsedForPartitionedRegionBucket()) {
return null;
}
CacheDistributionAdvisor advisor;
advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
- return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
+ return advisor.adviseFilterRouting(event, cacheOpRecipients);
}
/**
@@ -860,7 +846,7 @@ public abstract class DistributedCacheOperation {
if (fp == null) {
return null;
}
- FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(frInfo, this.event);
+ FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(frInfo, event);
if (fri == null) {
return null;
}
@@ -873,7 +859,7 @@ public abstract class DistributedCacheOperation {
msg.regionPath = getRegion().getFullPath();
msg.processorId = p == null ? 0 : p.getProcessorId();
msg.processor = p;
- if (this.event.getOperation().isEntry()) {
+ if (event.getOperation().isEntry()) {
EntryEventImpl entryEvent = getEvent();
msg.callbackArg = entryEvent.getRawCallbackArgument();
msg.possibleDuplicate = entryEvent.isPossibleDuplicate();
@@ -885,9 +871,9 @@ public abstract class DistributedCacheOperation {
}
} else {
- msg.callbackArg = ((RegionEventImpl) this.event).getRawCallbackArgument();
+ msg.callbackArg = ((RegionEventImpl) event).getRawCallbackArgument();
}
- msg.op = this.event.getOperation();
+ msg.op = event.getOperation();
msg.owner = this;
msg.regionAllowsConflation = getRegion().getEnableAsyncConflation();
@@ -896,7 +882,7 @@ public abstract class DistributedCacheOperation {
@Override
public String toString() {
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
- return cname + "(" + this.event + ")";
+ return cname + "(" + event + ")";
}
/**
@@ -962,16 +948,16 @@ public abstract class DistributedCacheOperation {
protected boolean inhibitAllNotifications;
public Operation getOperation() {
- return this.op;
+ return op;
}
/** sets the concurrency versioning tag for this message */
public void setVersionTag(VersionTag tag) {
- this.versionTag = tag;
+ versionTag = tag;
}
public VersionTag getVersionTag() {
- return this.versionTag;
+ return versionTag;
}
@Override
@@ -988,21 +974,21 @@ public abstract class DistributedCacheOperation {
@Override
public void registerProcessor() {
if (processor != null) {
- this.processorId = this.processor.register();
+ processorId = processor.register();
}
- this.directAck = false;
+ directAck = false;
}
public void setFilterInfo(FilterRoutingInfo fInfo) {
- this.filterRouting = fInfo;
+ filterRouting = fInfo;
}
public void setInhibitNotificationsBit(boolean inhibit) {
- this.inhibitAllNotifications = inhibit;
+ inhibitAllNotifications = inhibit;
}
public String getRegionPath() {
- return this.regionPath;
+ return regionPath;
}
/**
@@ -1043,11 +1029,11 @@ public abstract class DistributedCacheOperation {
CqService cqService = event.getRegion().getCache().getCqService();
if (cqService.isRunning()/* || event.getOperation().guaranteesOldValue() */) {
event.setOldValueForQueryProcessing();
- if (!event.hasOldValue() && this.hasOldValue) {
- if (this.oldValueIsSerialized) {
- event.setSerializedOldValue((byte[]) this.oldValue);
+ if (!event.hasOldValue() && hasOldValue) {
+ if (oldValueIsSerialized) {
+ event.setSerializedOldValue((byte[]) oldValue);
} else {
- event.setOldValue(this.oldValue);
+ event.setOldValue(oldValue);
}
}
}
@@ -1059,15 +1045,15 @@ public abstract class DistributedCacheOperation {
* @since GemFire 6.1
*/
protected void setHasDelta(boolean flag) {
- this.hasDelta = flag;
+ hasDelta = flag;
}
protected boolean hasDelta() {
- return this.hasDelta;
+ return hasDelta;
}
public FilterRoutingInfo getFilterInfo() {
- return this.filterRouting;
+ return filterRouting;
}
/**
@@ -1079,7 +1065,7 @@ public abstract class DistributedCacheOperation {
@Override
public int getProcessorId() {
- return this.processorId;
+ return processorId;
}
@Override
@@ -1088,9 +1074,9 @@ public abstract class DistributedCacheOperation {
}
protected LocalRegion getLocalRegionForProcessing(ClusterDistributionManager dm) {
- Assert.assertTrue(this.regionPath != null, "regionPath was null");
+ Assert.assertTrue(regionPath != null, "regionPath was null");
InternalCache gfc = dm.getExistingCache();
- return (LocalRegion) gfc.getRegionByPathForProcessing(this.regionPath);
+ return (LocalRegion) gfc.getRegionByPathForProcessing(regionPath);
}
@Override
@@ -1098,11 +1084,11 @@ public abstract class DistributedCacheOperation {
Throwable thr = null;
boolean sendReply = true;
- if (this.versionTag != null) {
- this.versionTag.replaceNullIDs(getSender());
+ if (versionTag != null) {
+ versionTag.replaceNullIDs(getSender());
}
- EntryLogger.setSource(this.getSender(), "p2p");
+ EntryLogger.setSource(getSender(), "p2p");
final InitializationLevel oldLevel =
LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
try {
@@ -1115,7 +1101,7 @@ public abstract class DistributedCacheOperation {
sendReply = false;
basicProcess(dm, lclRgn);
} catch (CancelException ignore) {
- this.closed = true;
+ closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
}
@@ -1158,10 +1144,8 @@ public abstract class DistributedCacheOperation {
logger.trace("DistributedCacheOperation.basicProcess: {}", this);
}
try {
- // LocalRegion lclRgn = getRegionFromPath(dm.getSystem(),
- // this.regionPath);
if (lclRgn == null) {
- this.closed = true;
+ closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} region not found, nothing to do", this);
}
@@ -1169,8 +1153,6 @@ public abstract class DistributedCacheOperation {
}
// Could this cause a deadlock, because this can block a P2P reader
// thread which might be needed to read the create region reply??
- // DAN - I don't think this does anything because process called
- // LocalRegion.setThreadInitLevelRequirement
lclRgn.waitOnInitialization();
// In some subclasses, lclRgn may be destroyed, so be careful not to
// allow a RegionDestroyedException to be thrown on lclRgn access
@@ -1193,7 +1175,7 @@ public abstract class DistributedCacheOperation {
try {
boolean isEntry = event.getOperation().isEntry();
- if (isEntry && this.possibleDuplicate) {
+ if (isEntry && possibleDuplicate) {
((EntryEventImpl) event).setPossibleDuplicate(true);
// If the state of the initial image yet to be received is unknown,
// we must not apply the event. It may already be reflected in the
@@ -1211,29 +1193,29 @@ public abstract class DistributedCacheOperation {
}
}
- sendReply = operateOnRegion(event, dm) && sendReply;
+ sendReply = operateOnRegion(event, dm);
} finally {
if (event instanceof EntryEventImpl) {
((Releasable) event).release();
}
}
} catch (RegionDestroyedException ignore) {
- this.closed = true;
+ closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Region destroyed: nothing to do", this);
}
} catch (CancelException ignore) {
- this.closed = true;
+ closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
}
} catch (DiskAccessException e) {
- this.closed = true;
+ closed = true;
if (!lclRgn.isDestroyed()) {
logger.error("Got disk access exception, expected region to be destroyed", e);
}
} catch (EntryNotFoundException ignore) {
- this.appliedOperation = true;
+ appliedOperation = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Entry not found, nothing to do", this);
}
@@ -1256,7 +1238,7 @@ public abstract class DistributedCacheOperation {
SystemFailure.checkFailure();
thr = t;
} finally {
- checkVersionIsRecorded(this.versionTag, lclRgn);
+ checkVersionIsRecorded(versionTag, lclRgn);
if (sendReply) {
ReplyException rex = null;
if (thr != null) {
@@ -1273,10 +1255,9 @@ public abstract class DistributedCacheOperation {
public void sendReply(InternalDistributedMember recipient, int pId, ReplyException rex,
ReplySender dm) {
- if (pId == 0 && (dm instanceof DistributionManager) && !this.directAck) {// Fix for #41871
- // distributed-no-ack message. Don't respond
- } else {
- ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
+ // Don't respond to distributed-no-ack message.
+ if (pId != 0 || (!(dm instanceof DistributionManager)) || directAck) {
+ ReplyMessage.send(recipient, pId, rex, dm, !appliedOperation, closed, false,
isInternal());
}
}
@@ -1325,7 +1306,8 @@ public abstract class DistributedCacheOperation {
protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException;
- protected abstract boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm)
+ protected abstract boolean operateOnRegion(CacheEvent<?, ?> event,
+ ClusterDistributionManager dm)
throws EntryNotFoundException;
@Override
@@ -1333,7 +1315,7 @@ public abstract class DistributedCacheOperation {
StringBuilder buff = new StringBuilder();
buff.append(getShortClassName());
buff.append("(region path='"); // make sure this is the first one
- buff.append(this.regionPath);
+ buff.append(regionPath);
buff.append("'");
appendFields(buff);
buff.append(")");
@@ -1345,28 +1327,28 @@ public abstract class DistributedCacheOperation {
buff.append("; sender=");
buff.append(getSender());
buff.append("; callbackArg=");
- buff.append(this.callbackArg);
+ buff.append(callbackArg);
buff.append("; processorId=");
- buff.append(this.processorId);
+ buff.append(processorId);
buff.append("; op=");
- buff.append(this.op);
+ buff.append(op);
buff.append("; applied=");
- buff.append(this.appliedOperation);
+ buff.append(appliedOperation);
buff.append("; directAck=");
- buff.append(this.directAck);
+ buff.append(directAck);
buff.append("; posdup=");
- buff.append(this.possibleDuplicate);
+ buff.append(possibleDuplicate);
buff.append("; hasDelta=");
- buff.append(this.hasDelta);
+ buff.append(hasDelta);
buff.append("; hasOldValue=");
- buff.append(this.hasOldValue);
- if (this.versionTag != null) {
+ buff.append(hasOldValue);
+ if (versionTag != null) {
buff.append("; version=");
- buff.append(this.versionTag);
+ buff.append(versionTag);
}
- if (this.filterRouting != null) {
+ if (filterRouting != null) {
buff.append(" ");
- buff.append(this.filterRouting.toString());
+ buff.append(filterRouting);
}
}
@@ -1375,42 +1357,42 @@ public abstract class DistributedCacheOperation {
DeserializationContext context) throws IOException, ClassNotFoundException {
short bits = in.readShort();
short extBits = in.readShort();
- this.flags = bits;
+ flags = bits;
setFlags(bits, in);
- this.regionPath = DataSerializer.readString(in);
- this.op = Operation.fromOrdinal(in.readByte());
+ regionPath = DataSerializer.readString(in);
+ op = Operation.fromOrdinal(in.readByte());
// TODO dirack There's really no reason to send this flag across the wire
// anymore
- this.directAck = (bits & DIRECT_ACK_MASK) != 0;
- this.possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0;
+ directAck = (bits & DIRECT_ACK_MASK) != 0;
+ possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0;
if ((bits & CALLBACK_ARG_MASK) != 0) {
- this.callbackArg = DataSerializer.readObject(in);
+ callbackArg = DataSerializer.readObject(in);
}
- this.hasDelta = (bits & DELTA_MASK) != 0;
- this.hasOldValue = (bits & OLD_VALUE_MASK) != 0;
- if (this.hasOldValue) {
+ hasDelta = (bits & DELTA_MASK) != 0;
+ hasOldValue = (bits & OLD_VALUE_MASK) != 0;
+ if (hasOldValue) {
byte b = in.readByte();
if (b == 0) {
- this.oldValueIsSerialized = false;
+ oldValueIsSerialized = false;
} else if (b == 1) {
- this.oldValueIsSerialized = true;
+ oldValueIsSerialized = true;
} else {
throw new IllegalStateException("expected 0 or 1");
}
- this.oldValue = DataSerializer.readByteArray(in);
+ oldValue = DataSerializer.readByteArray(in);
}
boolean hasFilterInfo = (bits & FILTER_INFO_MASK) != 0;
- this.needsRouting = (bits & NEEDS_ROUTING_MASK) != 0;
+ needsRouting = (bits & NEEDS_ROUTING_MASK) != 0;
if (hasFilterInfo) {
- this.filterRouting = new FilterRoutingInfo();
- InternalDataSerializer.invokeFromData(this.filterRouting, in);
+ filterRouting = new FilterRoutingInfo();
+ InternalDataSerializer.invokeFromData(filterRouting, in);
}
if ((bits & VERSION_TAG_MASK) != 0) {
boolean persistentTag = (bits & PERSISTENT_TAG_MASK) != 0;
- this.versionTag = VersionTag.create(persistentTag, in);
+ versionTag = VersionTag.create(persistentTag, in);
}
if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) {
- this.inhibitAllNotifications = true;
+ inhibitAllNotifications = true;
}
}
@@ -1423,68 +1405,68 @@ public abstract class DistributedCacheOperation {
extendedBits = computeCompressedExtBits(extendedBits);
out.writeShort(bits);
out.writeShort(extendedBits);
- if (this.processorId > 0) {
- out.writeInt(this.processorId);
+ if (processorId > 0) {
+ out.writeInt(processorId);
}
- DataSerializer.writeString(this.regionPath, out);
- out.writeByte(this.op.ordinal);
- if (this.callbackArg != null) {
- DataSerializer.writeObject(this.callbackArg, out);
+ DataSerializer.writeString(regionPath, out);
+ out.writeByte(op.ordinal);
+ if (callbackArg != null) {
+ DataSerializer.writeObject(callbackArg, out);
}
- if (this.hasOldValue) {
- out.writeByte(this.oldValueIsSerialized ? 1 : 0);
+ if (hasOldValue) {
+ out.writeByte(oldValueIsSerialized ? 1 : 0);
// the receiving side expects that the old value will have been serialized
// as a byte array
- final byte policy = valueIsToDeserializationPolicy(this.oldValueIsSerialized);
+ final byte policy = valueIsToDeserializationPolicy(oldValueIsSerialized);
final Object vObj;
final byte[] vBytes;
- if (!this.oldValueIsSerialized && this.oldValue instanceof byte[]) {
+ if (!oldValueIsSerialized && oldValue instanceof byte[]) {
vObj = null;
- vBytes = (byte[]) this.oldValue;
+ vBytes = (byte[]) oldValue;
} else {
- vObj = this.oldValue;
+ vObj = oldValue;
vBytes = null;
}
writeValue(policy, vObj, vBytes, out);
}
- if (this.filterRouting != null) {
- InternalDataSerializer.invokeToData(this.filterRouting, out);
+ if (filterRouting != null) {
+ InternalDataSerializer.invokeToData(filterRouting, out);
}
- if (this.versionTag != null) {
- InternalDataSerializer.invokeToData(this.versionTag, out);
+ if (versionTag != null) {
+ InternalDataSerializer.invokeToData(versionTag, out);
}
}
protected short computeCompressedShort(short bits) {
- if (this.hasOldValue) {
+ if (hasOldValue) {
bits |= OLD_VALUE_MASK;
}
- if (this.directAck) {
+ if (directAck) {
bits |= DIRECT_ACK_MASK;
}
- if (this.possibleDuplicate) {
+ if (possibleDuplicate) {
bits |= POSSIBLE_DUPLICATE_MASK;
}
- if (this.processorId != 0) {
+ if (processorId != 0) {
bits |= HAS_PROCESSOR_ID;
}
- if (this.callbackArg != null) {
+ if (callbackArg != null) {
bits |= CALLBACK_ARG_MASK;
}
- if (this.hasDelta) {
+ if (hasDelta) {
bits |= DELTA_MASK;
}
- if (this.filterRouting != null) {
+ if (filterRouting != null) {
bits |= FILTER_INFO_MASK;
}
- if (this.needsRouting) {
+ if (needsRouting) {
bits |= NEEDS_ROUTING_MASK;
}
- if (this.versionTag != null) {
+ if (versionTag != null) {
bits |= VERSION_TAG_MASK;
}
- if (this.versionTag instanceof DiskVersionTag) {
+ if (versionTag instanceof DiskVersionTag) {
bits |= PERSISTENT_TAG_MASK;
}
if (inhibitAllNotifications) {
@@ -1502,14 +1484,14 @@ public abstract class DistributedCacheOperation {
protected void setFlags(short bits, DataInput in) throws IOException, ClassNotFoundException {
if ((bits & HAS_PROCESSOR_ID) != 0) {
- this.processorId = in.readInt();
- ReplyProcessor21.setMessageRPId(this.processorId);
+ processorId = in.readInt();
+ ReplyProcessor21.setMessageRPId(processorId);
}
}
@Override
public boolean supportsDirectAck() {
- return this.directAck;
+ return directAck;
}
public void setSendDelta(boolean sendDelta) {
@@ -1533,16 +1515,16 @@ public abstract class DistributedCacheOperation {
@Override
public void importOldObject(Object ov, boolean isSerialized) {
- this.oldValueIsSerialized = isSerialized;
- this.oldValue = ov;
- this.hasOldValue = true;
+ oldValueIsSerialized = isSerialized;
+ oldValue = ov;
+ hasOldValue = true;
}
@Override
public void importOldBytes(byte[] ov, boolean isSerialized) {
- this.oldValueIsSerialized = isSerialized;
- this.oldValue = ov;
- this.hasOldValue = true;
+ oldValueIsSerialized = isSerialized;
+ oldValue = ov;
+ hasOldValue = true;
}
protected boolean notifiesSerialGatewaySender(ClusterDistributionManager dm) {
@@ -1564,26 +1546,22 @@ public abstract class DistributedCacheOperation {
/** Custom subclass that keeps all ReplyExceptions */
private static class ReliableCacheReplyProcessor extends CacheOperationReplyProcessor {
- private final Set failedMembers;
+ private final Set<InternalDistributedMember> failedMembers;
- private final DistributionManager dm;
-
- public ReliableCacheReplyProcessor(InternalDistributedSystem system, Collection initMembers,
- Set departedMembers) {
+ public ReliableCacheReplyProcessor(InternalDistributedSystem system,
+ Collection<InternalDistributedMember> initMembers,
+ Set<InternalDistributedMember> departedMembers) {
super(system, initMembers);
- this.dm = system.getDistributionManager();
- this.failedMembers = departedMembers;
+ failedMembers = departedMembers;
}
@Override
protected synchronized void processException(DistributionMessage dmsg, ReplyException ex) {
Throwable cause = ex.getCause();
// only interested in CacheClosedException and RegionDestroyedException
- if (cause instanceof CancelException || cause instanceof RegionDestroyedException) {
- this.failedMembers.add(dmsg.getSender());
- } else {
+ failedMembers.add(dmsg.getSender());
+ if (!(cause instanceof CancelException) && !(cause instanceof RegionDestroyedException)) {
// allow superclass to handle all other exceptions
- this.failedMembers.add(dmsg.getSender());
super.processException(dmsg, ex);
}
}
@@ -1594,7 +1572,7 @@ public abstract class DistributedCacheOperation {
if (logger.isDebugEnabled()) {
logger.debug("{} replied with ignored true", dmsg.getSender());
}
- this.failedMembers.add(dmsg.getSender());
+ failedMembers.add(dmsg.getSender());
}
super.process(dmsg, warn);
}
@@ -1605,7 +1583,8 @@ public abstract class DistributedCacheOperation {
public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
- public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
+ public CacheOperationReplyProcessor(InternalDistributedSystem system,
+ Collection<InternalDistributedMember> initMembers) {
super(system, initMembers);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index 606fac6..ff4bb2a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -41,13 +41,14 @@ public class DistributedCacheOperationTest {
@Test
public void shouldBeMockable() throws Exception {
DistributedCacheOperation mockDistributedCacheOperation = mock(DistributedCacheOperation.class);
+ @SuppressWarnings("unused") // forces CacheOperationMessage to be mockable
CacheOperationMessage mockCacheOperationMessage = mock(CacheOperationMessage.class);
Map<InternalDistributedMember, PersistentMemberID> persistentIds = new HashMap<>();
when(mockDistributedCacheOperation.supportsDirectAck()).thenReturn(false);
- mockDistributedCacheOperation.waitForAckIfNeeded(mockCacheOperationMessage, persistentIds);
+ mockDistributedCacheOperation.waitForAckIfNeeded(persistentIds);
- verify(mockDistributedCacheOperation, times(1)).waitForAckIfNeeded(mockCacheOperationMessage,
+ verify(mockDistributedCacheOperation, times(1)).waitForAckIfNeeded(
persistentIds);
assertThat(mockDistributedCacheOperation.supportsDirectAck()).isFalse();
@@ -80,7 +81,7 @@ public class DistributedCacheOperationTest {
boolean endOperationInvoked;
DistributedRegion region;
- public TestOperation(CacheEvent event) {
+ public TestOperation(CacheEvent<?, ?> event) {
super(event);
}
@@ -114,7 +115,7 @@ public class DistributedCacheOperationTest {
@Test
public void testDoRemoveDestroyTokensFromCqResultKeys() {
Object key = new Object();
- HashMap hashMap = new HashMap();
+ HashMap<Long, Integer> hashMap = new HashMap<>();
hashMap.put(1L, MessageType.LOCAL_DESTROY);
EntryEventImpl baseEvent = mock(EntryEventImpl.class);
ServerCQ serverCQ = mock(ServerCQ.class);
@@ -123,7 +124,7 @@ public class DistributedCacheOperationTest {
new DestroyOperation(baseEvent);
when(baseEvent.getKey()).thenReturn(key);
when(filterInfo.getCQs()).thenReturn(hashMap);
- when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+ when(serverCQ.getFilterID()).thenReturn(1L);
doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);