You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/08/17 18:17:42 UTC

[geode] 10/18: GEODE-6588: Cleanup AbstractUpdateOperation

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7e0888b2542bc027ec6889d007725b0dcbe029f7
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Thu May 20 15:43:21 2021 -0700

    GEODE-6588: Cleanup AbstractUpdateOperation
---
 .../internal/cache/AbstractUpdateOperation.java    |  57 +--
 .../internal/cache/CacheDistributionAdvisor.java   |  34 +-
 .../internal/cache/DistributedCacheOperation.java  | 387 ++++++++++-----------
 .../cache/DistributedCacheOperationTest.java       |  11 +-
 4 files changed, 224 insertions(+), 265 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 69bfa39..32df318 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -33,7 +33,6 @@ import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
-import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.cache.versions.VersionTag;
@@ -57,13 +56,13 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
 
   private final long lastModifiedTime;
 
-  public AbstractUpdateOperation(CacheEvent event, long lastModifiedTime) {
+  public AbstractUpdateOperation(CacheEvent<?, ?> event, long lastModifiedTime) {
     super(event);
     this.lastModifiedTime = lastModifiedTime;
   }
 
   @Override
-  protected Set getRecipients() {
+  protected Set<InternalDistributedMember> getRecipients() {
     CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor();
     return advisor.adviseUpdate(getEvent());
   }
@@ -72,11 +71,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
   protected void initMessage(CacheOperationMessage msg, DirectReplyProcessor pr) {
     super.initMessage(msg, pr);
     AbstractUpdateMessage m = (AbstractUpdateMessage) msg;
-    DistributedRegion region = getRegion();
-    DistributionManager mgr = region.getDistributionManager();
-    // [bruce] We might have to stop using cacheTimeMillis because it causes a skew between
-    // lastModified and the version tag's timestamp
-    m.lastModified = this.lastModifiedTime;
+    m.lastModified = lastModifiedTime;
   }
 
   private static final boolean ALWAYS_REPLICATE_UPDATES =
@@ -85,19 +80,12 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
   /** @return whether we should do a local create for a remote one */
   private static boolean shouldDoRemoteCreate(LocalRegion rgn, EntryEventImpl ev) {
     DataPolicy dp = rgn.getAttributes().getDataPolicy();
-    if (!rgn.isAllEvents() || (dp.withReplication() && rgn.isInitialized()
-        && ev.getOperation().isUpdate() && !rgn.getConcurrencyChecksEnabled()
-        // misordered CREATE and
-        // UPDATE messages can
-        // cause inconsistencies
-        && !ALWAYS_REPLICATE_UPDATES)) {
-      // we are not accepting all events
-      // or we are a replicate and initialized and it was an update
-      // (we exclude that latter to avoid resurrecting a key deleted in a replicate
-      return false;
-    } else {
-      return true;
-    }
+    // we are not accepting all events or we are a replicate and initialized and it was an update
+    // (we exclude that latter to avoid resurrecting a key deleted in a replicate
+    // misordered CREATE and UPDATE messages can cause inconsistencies
+    return rgn.isAllEvents() && (!dp.withReplication() || !rgn.isInitialized()
+        || !ev.getOperation().isUpdate() || rgn.getConcurrencyChecksEnabled()
+        || ALWAYS_REPLICATE_UPDATES);
   }
 
   private static boolean checkIfToUpdateAfterCreateFailed(LocalRegion rgn, EntryEventImpl ev) {
@@ -200,7 +188,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
               if (logger.isTraceEnabled()) {
                 logger.trace("Processing put key {} in region {}", ev.getKey(), rgn.getFullPath());
               }
-              updated = true;
             } else {
               // key not here, blocked by DESTROYED token or ConcurrentCacheModificationException
               // thrown during second update attempt
@@ -211,7 +198,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
                     rgn.basicUpdate(ev, false, false, lastMod, true, invokeCallbacks, false);
                 if (thirdBasicUpdateSuccess) {
                   rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
-                  updated = true;
                 }
               } else {
                 if (rgn.getVersionVector() != null && ev.getVersionTag() != null) {
@@ -264,25 +250,20 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
     protected long lastModified;
 
     @Override
-    protected boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm)
+    protected boolean operateOnRegion(CacheEvent<?, ?> event, ClusterDistributionManager dm)
         throws EntryNotFoundException {
       EntryEventImpl ev = (EntryEventImpl) event;
       DistributedRegion rgn = (DistributedRegion) ev.getRegion();
-      DistributionManager mgr = dm;
-      boolean sendReply = true; // by default tell caller to send ack
 
-      // if (!rgn.hasSeenEvent((InternalCacheEvent)event)) {
       if (!rgn.isCacheContentProxy()) {
         basicOperateOnRegion(ev, rgn);
-      }
-      // }
-      else {
+      } else {
         if (logger.isDebugEnabled()) {
           logger.debug("UpdateMessage: this cache has already seen this event {}", event);
         }
       }
 
-      return sendReply;
+      return true; // tell caller to send ack
     }
 
     // @todo darrel: make this method static?
@@ -297,15 +278,15 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
         logger.debug("Processing  {}", this);
       }
       try {
-        long time = this.lastModified;
+        long time = lastModified;
         if (ev.getVersionTag() != null) {
           checkVersionTag(rgn, ev.getVersionTag());
           time = ev.getVersionTag().getVersionTimeStamp();
         }
-        this.appliedOperation = doPutOrCreate(rgn, ev, time);
+        appliedOperation = doPutOrCreate(rgn, ev, time);
       } catch (ConcurrentCacheModificationException e) {
         dispatchElidedEvent(rgn, ev);
-        this.appliedOperation = false;
+        appliedOperation = false;
       }
     }
 
@@ -313,25 +294,25 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
     protected void appendFields(StringBuilder buff) {
       super.appendFields(buff);
       buff.append("; lastModified=");
-      buff.append(this.lastModified);
+      buff.append(lastModified);
     }
 
     @Override
     public void fromData(DataInput in,
         DeserializationContext context) throws IOException, ClassNotFoundException {
       super.fromData(in, context);
-      this.lastModified = in.readLong();
+      lastModified = in.readLong();
     }
 
     @Override
     public void toData(DataOutput out,
         SerializationContext context) throws IOException {
       super.toData(out, context);
-      out.writeLong(this.lastModified);
+      out.writeLong(lastModified);
     }
 
     protected void checkVersionTag(DistributedRegion rgn, VersionTag tag) {
-      RegionAttributes attr = rgn.getAttributes();
+      RegionAttributes<?, ?> attr = rgn.getAttributes();
       if (attr.getConcurrencyChecksEnabled() && attr.getDataPolicy().withPersistence()
           && attr.getScope() != Scope.GLOBAL
           && (tag.getMemberID() == null || test_InvalidVersion)) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
index 5879cb6..3d49278 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheDistributionAdvisor.java
@@ -188,7 +188,8 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * Provide recipient information for an update or create operation.
    *
    */
-  Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
+  Set<InternalDistributedMember> adviseUpdate(final EntryEventImpl event)
+      throws IllegalStateException {
     if (event.hasNewValue() || event.getOperation().isPutAll()) {
       // only need to distribute it to members that want all events or cache data
       return adviseAllEventsOrCached();
@@ -246,7 +247,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
       });
     } else {
       StringBuilder badIds = new StringBuilder();
-      Iterator biI = badList.iterator();
+      Iterator<InternalDistributedMember> biI = badList.iterator();
       while (biI.hasNext()) {
         badIds.append(biI.next().toString());
         if (biI.hasNext()) {
@@ -254,8 +255,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
         }
       }
       throw new IllegalStateException(
-          String.format("Illegal Region Configuration for members: %s",
-              badIds.toString()));
+          String.format("Illegal Region Configuration for members: %s", badIds));
     }
   }
 
@@ -265,7 +265,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * @return Set of Serializable members that have a CacheLoader installed; no reference to Set kept
    *         by advisor so caller is free to modify it
    */
-  public Set adviseNetLoad() {
+  public Set<InternalDistributedMember> adviseNetLoad() {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile prof = (CacheProfile) profile;
@@ -279,7 +279,8 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
     });
   }
 
-  public FilterRoutingInfo adviseFilterRouting(CacheEvent event, Set cacheOpRecipients) {
+  public FilterRoutingInfo adviseFilterRouting(CacheEvent<?, ?> event,
+      Set<InternalDistributedMember> cacheOpRecipients) {
     FilterProfile fp = ((LocalRegion) event.getRegion()).getFilterProfile();
     if (fp != null) {
       return fp.getFilterRoutingInfoPart1(event, profiles, cacheOpRecipients);
@@ -310,7 +311,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
   /**
    * Same as adviseGeneric
    */
-  public Set adviseDestroyRegion() {
+  public Set<InternalDistributedMember> adviseDestroyRegion() {
     return adviseGeneric();
   }
 
@@ -320,7 +321,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * @return Set of Serializable member ids that have a CacheWriter installed; no reference to Set
    *         kept by advisor so caller is free to modify it
    */
-  public Set adviseNetWrite() {
+  public Set<InternalDistributedMember> adviseNetWrite() {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile prof = (CacheProfile) profile;
@@ -347,7 +348,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * @return Set of Serializable member ids that have the region and are have storage (no need to
    *         search an empty cache)
    */
-  Set adviseNetSearch() {
+  Set<InternalDistributedMember> adviseNetSearch() {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
@@ -452,7 +453,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    *
    * @since GemFire 5.5
    */
-  Set adviseRequiresOldValueInCacheOp() {
+  Set<InternalDistributedMember> adviseRequiresOldValueInCacheOp() {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
@@ -675,11 +676,11 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
 
     public boolean getInRecovery() {
       return inRecovery;
-    };
+    }
 
     public void setInRecovery(boolean recovery) {
       inRecovery = recovery;
-    };
+    }
 
     public DataPolicy getDataPolicy() {
       return dataPolicy;
@@ -743,9 +744,6 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
      * Return true if cached or allEvents and a listener
      */
     boolean cachedOrAllEventsWithListener() {
-      // to fix bug 36804 to ignore hasCacheListener
-      // return this.dataPolicy.withStorage() ||
-      // (allEvents() && this.hasCacheListener);
       return cachedOrAllEvents();
     }
 
@@ -1038,7 +1036,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * @return the set of preloaded's memberIds
    * @since GemFire prPersistSprint1
    */
-  public Set advisePreloadeds() {
+  public Set<InternalDistributedMember> advisePreloadeds() {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
@@ -1052,7 +1050,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
    * @return the set of replicate's memberIds
    * @since GemFire 5.8
    */
-  Set adviseEmptys() {
+  Set<InternalDistributedMember> adviseEmptys() {
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
       CacheProfile cp = (CacheProfile) profile;
@@ -1106,7 +1104,7 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
     return result;
   }
 
-  Set adviseCacheServers() {
+  Set<InternalDistributedMember> adviseCacheServers() {
     getAdvisee().getCancelCriterion().checkCancelInProgress(null);
     return adviseFilter(profile -> {
       assert profile instanceof CacheProfile;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 4a53ef6..e32a2af 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.Vector;
 
 import org.apache.logging.log4j.Logger;
 
@@ -138,11 +137,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-  // static values for oldValueIsObject
-  public static final byte VALUE_IS_BYTES = 0;
-  public static final byte VALUE_IS_SERIALIZED_OBJECT = 1;
-  public static final byte VALUE_IS_OBJECT = 2;
-
   /**
    * Given a VALUE_IS_* constant convert and return the corresponding DESERIALIZATION_POLICY_*.
    */
@@ -169,9 +163,9 @@ public abstract class DistributedCacheOperation {
 
   protected CacheOperationReplyProcessor processor = null;
 
-  protected Set departedMembers;
+  protected Set<InternalDistributedMember> departedMembers;
 
-  protected Set originalRecipients;
+  protected Set<InternalDistributedMember> originalRecipients;
 
   @MutableForTesting
   static Runnable internalBeforePutOutgoing;
@@ -188,7 +182,7 @@ public abstract class DistributedCacheOperation {
   }
 
   /** Creates a new instance of DistributedCacheOperation */
-  public DistributedCacheOperation(CacheEvent event) {
+  public DistributedCacheOperation(CacheEvent<?, ?> event) {
     this.event = (InternalCacheEvent) event;
   }
 
@@ -200,24 +194,19 @@ public abstract class DistributedCacheOperation {
    * @since GemFire 5.0
    */
   boolean isOperationReliable() {
-    Operation op = this.event.getOperation();
+    Operation op = event.getOperation();
     if (!op.isRegionDestroy()) {
       return true;
     }
-    if (op.isDistributed()) {
-      return true;
-    }
     // must be a region destroy that is "local" which means
     // Region.localDestroyRegion or Region.close or Cache.clsoe
     // none of these should do reliability checks
-    return false;
+    return op.isDistributed();
   }
 
   public boolean supportsDirectAck() {
     // force use of shared connection if we're already in a secondary
-    // thread-owned reader thread. See bug #49565. Also see Connection#processNIOBuffer
-    // int dominoCount = org.apache.geode.internal.tcp.Connection.getDominoCount();
-    // return dominoCount < 2;
+    // thread-owned reader thread. See Connection#processNIOBuffer
     return true;
   }
 
@@ -267,7 +256,7 @@ public abstract class DistributedCacheOperation {
     DistributedRegion region = getRegion();
     long viewVersion = -1;
     try {
-      if (this.containsRegionContentChange()) {
+      if (containsRegionContentChange()) {
         viewVersion = region.getDistributionAdvisor().startOperation();
       }
       if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP_VERBOSE)) {
@@ -354,13 +343,13 @@ public abstract class DistributedCacheOperation {
       }
 
       // some members requiring old value are also in the cache op recipients set
-      Set needsOldValueInCacheOp = Collections.emptySet();
+      Set<InternalDistributedMember> needsOldValueInCacheOp = Collections.emptySet();
 
       // set client routing information into the event
       boolean routingComputed = false;
       FilterRoutingInfo filterRouting = null;
       // recipients that will get a cacheop msg and also a PR message
-      Set twoMessages = Collections.emptySet();
+      Set<InternalDistributedMember> twoMessages = Collections.emptySet();
       if (region.isUsedForPartitionedRegionBucket()) {
         twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
         routingComputed = true;
@@ -374,12 +363,12 @@ public abstract class DistributedCacheOperation {
 
       // some members need PR notification of the change for client/wan
       // notification
-      Set adjunctRecipients = Collections.emptySet();
+      Set<InternalDistributedMember> adjunctRecipients = Collections.emptySet();
 
       // Partitioned region listener notification messages piggyback on this
       // operation's replyprocessor and need to be sent at the same time as
       // the operation's message
-      if (this.supportsAdjunctMessaging() && region.isUsedForPartitionedRegionBucket()) {
+      if (supportsAdjunctMessaging() && region.isUsedForPartitionedRegionBucket()) {
         BucketRegion br = (BucketRegion) region;
         adjunctRecipients = getAdjunctReceivers(br, recipients, twoMessages, filterRouting);
       }
@@ -388,7 +377,7 @@ public abstract class DistributedCacheOperation {
 
       if (entryEvent != null && entryEvent.hasOldValue()) {
         if (testSendingOldValues) {
-          needsOldValueInCacheOp = new HashSet(recipients);
+          needsOldValueInCacheOp = new HashSet<>(recipients);
         } else {
           needsOldValueInCacheOp =
               region.getCacheDistributionAdvisor().adviseRequiresOldValueInCacheOp();
@@ -396,14 +385,14 @@ public abstract class DistributedCacheOperation {
         recipients.removeAll(needsOldValueInCacheOp);
       }
 
-      Set cachelessNodes = Collections.emptySet();
-      Set adviseCacheServers;
+      Set<InternalDistributedMember> cachelessNodes = Collections.emptySet();
+      Set<InternalDistributedMember> adviseCacheServers;
       Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = Collections.emptySet();
-      if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
+      if (region.getDistributionConfig().getDeltaPropagation() && supportsDeltaPropagation()) {
         cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
         if (!cachelessNodes.isEmpty()) {
-          List list = new ArrayList(cachelessNodes);
-          for (Object member : cachelessNodes) {
+          List<InternalDistributedMember> list = new ArrayList<>(cachelessNodes);
+          for (InternalDistributedMember member : cachelessNodes) {
             if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
               // Don't include those originally excluded.
               list.remove(member);
@@ -437,26 +426,24 @@ public abstract class DistributedCacheOperation {
             }
           }
         }
-        if (!reliableOp || region.isNoDistributionOk()) {
-          // nothing needs be done in this case
-        } else {
+        if (reliableOp && !region.isNoDistributionOk()) {
           region.handleReliableDistribution(Collections.emptySet());
         }
 
         // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
-          this.event.setLocalFilterInfo(filterInfo);
+          event.setLocalFilterInfo(filterInfo);
         }
 
       } else {
         boolean directAck = false;
         boolean useMulticast = region.getMulticastEnabled()
-            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
+            && region.getSystem().getConfig().getMcastPort() != 0 && supportsMulticast();
         boolean shouldAck = shouldAck();
 
         if (shouldAck) {
-          if (this.supportsDirectAck() && adjunctRecipients.isEmpty()) {
+          if (supportsDirectAck() && adjunctRecipients.isEmpty()) {
             if (region.getSystem().threadOwnsResources()) {
               directAck = true;
             }
@@ -481,28 +468,25 @@ public abstract class DistributedCacheOperation {
         if (shouldAck) {
           // adjunct messages are sent using the same reply processor, so
           // add them to the processor's membership set
-          Collection waitForMembers = null;
-          if (recipients.size() > 0 && adjunctRecipients.size() == 0 && cachelessNodes.isEmpty()) { // the
-                                                                                                    // common
-                                                                                                    // case
+          final Collection<InternalDistributedMember> waitForMembers;
+          if (recipients.size() > 0 && adjunctRecipients.size() == 0 && cachelessNodes.isEmpty()) {
+            // the common case
             waitForMembers = recipients;
           } else if (!cachelessNodes.isEmpty()) {
-            waitForMembers = new HashSet(recipients);
+            waitForMembers = new HashSet<>(recipients);
             waitForMembers.addAll(cachelessNodes);
           } else {
-            // note that we use a Vector instead of a Set for the responders
-            // collection
-            // because partitioned regions sometimes send both a regular cache
-            // operation and a partitioned-region notification message to the
-            // same recipient
-            waitForMembers = new Vector(recipients);
+            // note that we use a List instead of a Set for the responders
+            // collection because partitioned regions sometimes send both a regular cache
+            // operation and a partitioned-region notification message to the same recipient
+            waitForMembers = new ArrayList<>(recipients);
             waitForMembers.addAll(adjunctRecipients);
             waitForMembers.addAll(needsOldValueInCacheOp);
             waitForMembers.addAll(cachelessNodes);
           }
           if (DistributedCacheOperation.LOSS_SIMULATION_RATIO != 0.0) {
             if (LOSS_SIMULATION_GENERATOR == null) {
-              LOSS_SIMULATION_GENERATOR = new Random(this.hashCode());
+              LOSS_SIMULATION_GENERATOR = new Random(hashCode());
             }
             if ((LOSS_SIMULATION_GENERATOR.nextInt(100) * 1.0 / 100.0) < LOSS_SIMULATION_RATIO) {
               if (logger.isDebugEnabled()) {
@@ -514,24 +498,23 @@ public abstract class DistributedCacheOperation {
             }
           }
           if (reliableOp) {
-            this.departedMembers = new HashSet();
-            this.processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers,
-                this.departedMembers);
+            departedMembers = new HashSet<>();
+            processor = new ReliableCacheReplyProcessor(region.getSystem(), waitForMembers,
+                departedMembers);
           } else {
-            this.processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers);
+            processor = new CacheOperationReplyProcessor(region.getSystem(), waitForMembers);
           }
         }
 
-        Set failures = null;
         CacheOperationMessage msg = createMessage();
-        initMessage(msg, this.processor);
+        initMessage(msg, processor);
 
         if (DistributedCacheOperation.internalBeforePutOutgoing != null) {
           DistributedCacheOperation.internalBeforePutOutgoing.run();
         }
 
         if (processor != null && msg.isSevereAlertCompatible()) {
-          this.processor.enableSevereAlertProcessing();
+          processor.enableSevereAlertProcessing();
           // if this message is distributing for a partitioned region message,
           // we can't wait as long as the full ack-severe-alert-threshold or
           // the sender might kick us out of the system before we can get an ack
@@ -570,14 +553,14 @@ public abstract class DistributedCacheOperation {
         }
 
         msg.setRecipients(recipients);
-        failures = mgr.putOutgoing(msg);
+        Set<InternalDistributedMember> failures = mgr.putOutgoing(msg);
 
         // distribute to members needing the old value now
         if (needsOldValueInCacheOp.size() > 0) {
-          msg.appendOldValueToMessage((EntryEventImpl) this.event);
+          msg.appendOldValueToMessage((EntryEventImpl) event);
           msg.resetRecipients();
           msg.setRecipients(needsOldValueInCacheOp);
-          Set newFailures = mgr.putOutgoing(msg);
+          Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg);
           if (newFailures != null) {
             if (logger.isDebugEnabled()) {
               logger.debug("Failed sending ({}) to {}", msg, newFailures);
@@ -596,7 +579,7 @@ public abstract class DistributedCacheOperation {
             msg.resetRecipients();
             msg.setRecipients(cachelessNodes);
             msg.setSendDelta(false);
-            Set newFailures = mgr.putOutgoing(msg);
+            Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg);
             if (newFailures != null) {
               if (failures != null && failures.size() > 0) {
                 failures.addAll(newFailures);
@@ -611,7 +594,7 @@ public abstract class DistributedCacheOperation {
             msg.setRecipients(cachelessNodesWithNoCacheServer);
             msg.setSendDelta(false);
             ((UpdateMessage) msg).setSendDeltaWithFullValue(false);
-            Set newFailures = mgr.putOutgoing(msg);
+            Set<InternalDistributedMember> newFailures = mgr.putOutgoing(msg);
             if (newFailures != null) {
               if (failures != null && failures.size() > 0) {
                 failures.addAll(newFailures);
@@ -628,7 +611,6 @@ public abstract class DistributedCacheOperation {
           logger.debug("Failed sending ({}) to {} while processing event:{}", msg, failures, event);
         }
 
-        Set<InternalDistributedMember> adjunctRecipientsWithNoCacheServer = Collections.emptySet();
         // send partitioned region listener notification messages now
         if (!adjunctRecipients.isEmpty()) {
           if (cachelessNodes.size() > 0) {
@@ -640,29 +622,31 @@ public abstract class DistributedCacheOperation {
               recipients.addAll(cachelessNodes);
             }
           }
-          adjunctRecipientsWithNoCacheServer = new HashSet<>(adjunctRecipients);
+
+          final Set<InternalDistributedMember> adjunctRecipientsWithNoCacheServer =
+              new HashSet<>(adjunctRecipients);
           adviseCacheServers = ((Bucket) region).getPartitionedRegion()
               .getCacheDistributionAdvisor().adviseCacheServers();
           adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
 
           if (isPutAll) {
             ((BucketRegion) region).performPutAllAdjunctMessaging((DistributedPutAllOperation) this,
-                recipients, adjunctRecipients, filterRouting, this.processor);
+                recipients, adjunctRecipients, filterRouting, processor);
           } else if (isRemoveAll) {
             ((BucketRegion) region).performRemoveAllAdjunctMessaging(
                 (DistributedRemoveAllOperation) this, recipients, adjunctRecipients, filterRouting,
-                this.processor);
+                processor);
           } else {
             boolean calculateDelta =
                 adjunctRecipientsWithNoCacheServer.size() < adjunctRecipients.size();
             adjunctRecipients.removeAll(adjunctRecipientsWithNoCacheServer);
             if (!adjunctRecipients.isEmpty()) {
               ((BucketRegion) region).performAdjunctMessaging(getEvent(), recipients,
-                  adjunctRecipients, filterRouting, this.processor, calculateDelta, true);
+                  adjunctRecipients, filterRouting, processor, calculateDelta, true);
             }
             if (!adjunctRecipientsWithNoCacheServer.isEmpty()) {
               ((BucketRegion) region).performAdjunctMessaging(getEvent(), recipients,
-                  adjunctRecipientsWithNoCacheServer, filterRouting, this.processor, calculateDelta,
+                  adjunctRecipientsWithNoCacheServer, filterRouting, processor, calculateDelta,
                   false);
             }
           }
@@ -674,10 +658,10 @@ public abstract class DistributedCacheOperation {
           event.setLocalFilterInfo(filterInfo);
         }
 
-        waitForAckIfNeeded(msg, persistentIds);
+        waitForAckIfNeeded(persistentIds);
 
         if (/* msg != null && */reliableOp) {
-          Set successfulRecips = new HashSet(recipients);
+          Set<InternalDistributedMember> successfulRecips = new HashSet<>(recipients);
           successfulRecips.addAll(cachelessNodes);
           successfulRecips.addAll(needsOldValueInCacheOp);
           if (failures != null && !failures.isEmpty()) {
@@ -747,8 +731,8 @@ public abstract class DistributedCacheOperation {
       // the entry form CQ cache.
       if (cq != null && cq.getFilterID() != null && cq.getFilterID().equals(cqID)
           && e.getValue() != null && e.getValue().equals(MessageType.LOCAL_DESTROY)
-          && ((EntryOperation) event).getKey() != null) {
-        cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
+          && ((EntryOperation<?, ?>) event).getKey() != null) {
+        cq.removeFromCqResultKeys(((EntryOperation<?, ?>) event).getKey(), true);
       }
     }
   }
@@ -762,9 +746,10 @@ public abstract class DistributedCacheOperation {
    *        messages
    * @param routing client routing information
    */
-  Set getAdjunctReceivers(BucketRegion br, Set cacheOpReceivers, Set twoMessages,
+  Set<InternalDistributedMember> getAdjunctReceivers(BucketRegion br,
+      Set<InternalDistributedMember> cacheOpReceivers, Set<InternalDistributedMember> twoMessages,
       FilterRoutingInfo routing) {
-    return br.getAdjunctReceivers(this.getEvent(), cacheOpReceivers, twoMessages, routing);
+    return br.getAdjunctReceivers(getEvent(), cacheOpReceivers, twoMessages, routing);
   }
 
   /**
@@ -774,16 +759,16 @@ public abstract class DistributedCacheOperation {
     // nothing to do here - see UpdateMessage
   }
 
-  protected void waitForAckIfNeeded(CacheOperationMessage msg,
+  protected void waitForAckIfNeeded(
       Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
-    if (this.processor == null) {
+    if (processor == null) {
       return;
     }
     try {
       // keep waiting even if interrupted
       try {
-        this.processor.waitForRepliesUninterruptibly();
-        Set<InternalDistributedMember> closedMembers = this.processor.closedMembers.getSnapshot();
+        processor.waitForRepliesUninterruptibly();
+        Set<InternalDistributedMember> closedMembers = processor.closedMembers.getSnapshot();
         handleClosedMembers(closedMembers, persistentIds);
       } catch (ReplyException e) {
         if (this instanceof DestroyRegionOperation) {
@@ -792,7 +777,7 @@ public abstract class DistributedCacheOperation {
         e.handleCause();
       }
     } finally {
-      this.processor = null;
+      processor = null;
     }
   }
 
@@ -828,27 +813,28 @@ public abstract class DistributedCacheOperation {
   }
 
   protected DistributedRegion getRegion() {
-    return (DistributedRegion) this.event.getRegion();
+    return (DistributedRegion) event.getRegion();
   }
 
   protected EntryEventImpl getEvent() {
-    return (EntryEventImpl) this.event;
+    return (EntryEventImpl) event;
   }
 
-  protected Set getRecipients() {
+  protected Set<InternalDistributedMember> getRecipients() {
     CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor();
-    this.originalRecipients = advisor.adviseCacheOp();
-    return this.originalRecipients;
+    originalRecipients = advisor.adviseCacheOp();
+    return originalRecipients;
   }
 
-  protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) {
+  protected FilterRoutingInfo getRecipientFilterRouting(
+      Set<InternalDistributedMember> cacheOpRecipients) {
     LocalRegion region = getRegion();
     if (!region.isUsedForPartitionedRegionBucket()) {
       return null;
     }
     CacheDistributionAdvisor advisor;
     advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
-    return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
+    return advisor.adviseFilterRouting(event, cacheOpRecipients);
   }
 
   /**
@@ -860,7 +846,7 @@ public abstract class DistributedCacheOperation {
     if (fp == null) {
       return null;
     }
-    FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(frInfo, this.event);
+    FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(frInfo, event);
     if (fri == null) {
       return null;
     }
@@ -873,7 +859,7 @@ public abstract class DistributedCacheOperation {
     msg.regionPath = getRegion().getFullPath();
     msg.processorId = p == null ? 0 : p.getProcessorId();
     msg.processor = p;
-    if (this.event.getOperation().isEntry()) {
+    if (event.getOperation().isEntry()) {
       EntryEventImpl entryEvent = getEvent();
       msg.callbackArg = entryEvent.getRawCallbackArgument();
       msg.possibleDuplicate = entryEvent.isPossibleDuplicate();
@@ -885,9 +871,9 @@ public abstract class DistributedCacheOperation {
       }
 
     } else {
-      msg.callbackArg = ((RegionEventImpl) this.event).getRawCallbackArgument();
+      msg.callbackArg = ((RegionEventImpl) event).getRawCallbackArgument();
     }
-    msg.op = this.event.getOperation();
+    msg.op = event.getOperation();
     msg.owner = this;
     msg.regionAllowsConflation = getRegion().getEnableAsyncConflation();
 
@@ -896,7 +882,7 @@ public abstract class DistributedCacheOperation {
   @Override
   public String toString() {
     String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
-    return cname + "(" + this.event + ")";
+    return cname + "(" + event + ")";
   }
 
   /**
@@ -962,16 +948,16 @@ public abstract class DistributedCacheOperation {
     protected boolean inhibitAllNotifications;
 
     public Operation getOperation() {
-      return this.op;
+      return op;
     }
 
     /** sets the concurrency versioning tag for this message */
     public void setVersionTag(VersionTag tag) {
-      this.versionTag = tag;
+      versionTag = tag;
     }
 
     public VersionTag getVersionTag() {
-      return this.versionTag;
+      return versionTag;
     }
 
     @Override
@@ -988,21 +974,21 @@ public abstract class DistributedCacheOperation {
     @Override
     public void registerProcessor() {
       if (processor != null) {
-        this.processorId = this.processor.register();
+        processorId = processor.register();
       }
-      this.directAck = false;
+      directAck = false;
     }
 
     public void setFilterInfo(FilterRoutingInfo fInfo) {
-      this.filterRouting = fInfo;
+      filterRouting = fInfo;
     }
 
     public void setInhibitNotificationsBit(boolean inhibit) {
-      this.inhibitAllNotifications = inhibit;
+      inhibitAllNotifications = inhibit;
     }
 
     public String getRegionPath() {
-      return this.regionPath;
+      return regionPath;
     }
 
     /**
@@ -1043,11 +1029,11 @@ public abstract class DistributedCacheOperation {
       CqService cqService = event.getRegion().getCache().getCqService();
       if (cqService.isRunning()/* || event.getOperation().guaranteesOldValue() */) {
         event.setOldValueForQueryProcessing();
-        if (!event.hasOldValue() && this.hasOldValue) {
-          if (this.oldValueIsSerialized) {
-            event.setSerializedOldValue((byte[]) this.oldValue);
+        if (!event.hasOldValue() && hasOldValue) {
+          if (oldValueIsSerialized) {
+            event.setSerializedOldValue((byte[]) oldValue);
           } else {
-            event.setOldValue(this.oldValue);
+            event.setOldValue(oldValue);
           }
         }
       }
@@ -1059,15 +1045,15 @@ public abstract class DistributedCacheOperation {
      * @since GemFire 6.1
      */
     protected void setHasDelta(boolean flag) {
-      this.hasDelta = flag;
+      hasDelta = flag;
     }
 
     protected boolean hasDelta() {
-      return this.hasDelta;
+      return hasDelta;
     }
 
     public FilterRoutingInfo getFilterInfo() {
-      return this.filterRouting;
+      return filterRouting;
     }
 
     /**
@@ -1079,7 +1065,7 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public int getProcessorId() {
-      return this.processorId;
+      return processorId;
     }
 
     @Override
@@ -1088,9 +1074,9 @@ public abstract class DistributedCacheOperation {
     }
 
     protected LocalRegion getLocalRegionForProcessing(ClusterDistributionManager dm) {
-      Assert.assertTrue(this.regionPath != null, "regionPath was null");
+      Assert.assertTrue(regionPath != null, "regionPath was null");
       InternalCache gfc = dm.getExistingCache();
-      return (LocalRegion) gfc.getRegionByPathForProcessing(this.regionPath);
+      return (LocalRegion) gfc.getRegionByPathForProcessing(regionPath);
     }
 
     @Override
@@ -1098,11 +1084,11 @@ public abstract class DistributedCacheOperation {
       Throwable thr = null;
       boolean sendReply = true;
 
-      if (this.versionTag != null) {
-        this.versionTag.replaceNullIDs(getSender());
+      if (versionTag != null) {
+        versionTag.replaceNullIDs(getSender());
       }
 
-      EntryLogger.setSource(this.getSender(), "p2p");
+      EntryLogger.setSource(getSender(), "p2p");
       final InitializationLevel oldLevel =
           LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
       try {
@@ -1115,7 +1101,7 @@ public abstract class DistributedCacheOperation {
         sendReply = false;
         basicProcess(dm, lclRgn);
       } catch (CancelException ignore) {
-        this.closed = true;
+        closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
         }
@@ -1158,10 +1144,8 @@ public abstract class DistributedCacheOperation {
         logger.trace("DistributedCacheOperation.basicProcess: {}", this);
       }
       try {
-        // LocalRegion lclRgn = getRegionFromPath(dm.getSystem(),
-        // this.regionPath);
         if (lclRgn == null) {
-          this.closed = true;
+          closed = true;
           if (logger.isDebugEnabled()) {
             logger.debug("{} region not found, nothing to do", this);
           }
@@ -1169,8 +1153,6 @@ public abstract class DistributedCacheOperation {
         }
         // Could this cause a deadlock, because this can block a P2P reader
         // thread which might be needed to read the create region reply??
-        // DAN - I don't think this does anything because process called
-        // LocalRegion.setThreadInitLevelRequirement
         lclRgn.waitOnInitialization();
         // In some subclasses, lclRgn may be destroyed, so be careful not to
         // allow a RegionDestroyedException to be thrown on lclRgn access
@@ -1193,7 +1175,7 @@ public abstract class DistributedCacheOperation {
         try {
           boolean isEntry = event.getOperation().isEntry();
 
-          if (isEntry && this.possibleDuplicate) {
+          if (isEntry && possibleDuplicate) {
             ((EntryEventImpl) event).setPossibleDuplicate(true);
             // If the state of the initial image yet to be received is unknown,
             // we must not apply the event. It may already be reflected in the
@@ -1211,29 +1193,29 @@ public abstract class DistributedCacheOperation {
             }
           }
 
-          sendReply = operateOnRegion(event, dm) && sendReply;
+          sendReply = operateOnRegion(event, dm);
         } finally {
           if (event instanceof EntryEventImpl) {
             ((Releasable) event).release();
           }
         }
       } catch (RegionDestroyedException ignore) {
-        this.closed = true;
+        closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Region destroyed: nothing to do", this);
         }
       } catch (CancelException ignore) {
-        this.closed = true;
+        closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
         }
       } catch (DiskAccessException e) {
-        this.closed = true;
+        closed = true;
         if (!lclRgn.isDestroyed()) {
           logger.error("Got disk access exception, expected region to be destroyed", e);
         }
       } catch (EntryNotFoundException ignore) {
-        this.appliedOperation = true;
+        appliedOperation = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Entry not found, nothing to do", this);
         }
@@ -1256,7 +1238,7 @@ public abstract class DistributedCacheOperation {
         SystemFailure.checkFailure();
         thr = t;
       } finally {
-        checkVersionIsRecorded(this.versionTag, lclRgn);
+        checkVersionIsRecorded(versionTag, lclRgn);
         if (sendReply) {
           ReplyException rex = null;
           if (thr != null) {
@@ -1273,10 +1255,9 @@ public abstract class DistributedCacheOperation {
 
     public void sendReply(InternalDistributedMember recipient, int pId, ReplyException rex,
         ReplySender dm) {
-      if (pId == 0 && (dm instanceof DistributionManager) && !this.directAck) {// Fix for #41871
-        // distributed-no-ack message. Don't respond
-      } else {
-        ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
+      // Don't respond to distributed-no-ack message.
+      if (pId != 0 || (!(dm instanceof DistributionManager)) || directAck) {
+        ReplyMessage.send(recipient, pId, rex, dm, !appliedOperation, closed, false,
             isInternal());
       }
     }
@@ -1325,7 +1306,8 @@ public abstract class DistributedCacheOperation {
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
         throws EntryNotFoundException;
 
-    protected abstract boolean operateOnRegion(CacheEvent event, ClusterDistributionManager dm)
+    protected abstract boolean operateOnRegion(CacheEvent<?, ?> event,
+        ClusterDistributionManager dm)
         throws EntryNotFoundException;
 
     @Override
@@ -1333,7 +1315,7 @@ public abstract class DistributedCacheOperation {
       StringBuilder buff = new StringBuilder();
       buff.append(getShortClassName());
       buff.append("(region path='"); // make sure this is the first one
-      buff.append(this.regionPath);
+      buff.append(regionPath);
       buff.append("'");
       appendFields(buff);
       buff.append(")");
@@ -1345,28 +1327,28 @@ public abstract class DistributedCacheOperation {
       buff.append("; sender=");
       buff.append(getSender());
       buff.append("; callbackArg=");
-      buff.append(this.callbackArg);
+      buff.append(callbackArg);
       buff.append("; processorId=");
-      buff.append(this.processorId);
+      buff.append(processorId);
       buff.append("; op=");
-      buff.append(this.op);
+      buff.append(op);
       buff.append("; applied=");
-      buff.append(this.appliedOperation);
+      buff.append(appliedOperation);
       buff.append("; directAck=");
-      buff.append(this.directAck);
+      buff.append(directAck);
       buff.append("; posdup=");
-      buff.append(this.possibleDuplicate);
+      buff.append(possibleDuplicate);
       buff.append("; hasDelta=");
-      buff.append(this.hasDelta);
+      buff.append(hasDelta);
       buff.append("; hasOldValue=");
-      buff.append(this.hasOldValue);
-      if (this.versionTag != null) {
+      buff.append(hasOldValue);
+      if (versionTag != null) {
         buff.append("; version=");
-        buff.append(this.versionTag);
+        buff.append(versionTag);
       }
-      if (this.filterRouting != null) {
+      if (filterRouting != null) {
         buff.append(" ");
-        buff.append(this.filterRouting.toString());
+        buff.append(filterRouting);
       }
     }
 
@@ -1375,42 +1357,42 @@ public abstract class DistributedCacheOperation {
         DeserializationContext context) throws IOException, ClassNotFoundException {
       short bits = in.readShort();
       short extBits = in.readShort();
-      this.flags = bits;
+      flags = bits;
       setFlags(bits, in);
-      this.regionPath = DataSerializer.readString(in);
-      this.op = Operation.fromOrdinal(in.readByte());
+      regionPath = DataSerializer.readString(in);
+      op = Operation.fromOrdinal(in.readByte());
       // TODO dirack There's really no reason to send this flag across the wire
       // anymore
-      this.directAck = (bits & DIRECT_ACK_MASK) != 0;
-      this.possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0;
+      directAck = (bits & DIRECT_ACK_MASK) != 0;
+      possibleDuplicate = (bits & POSSIBLE_DUPLICATE_MASK) != 0;
       if ((bits & CALLBACK_ARG_MASK) != 0) {
-        this.callbackArg = DataSerializer.readObject(in);
+        callbackArg = DataSerializer.readObject(in);
       }
-      this.hasDelta = (bits & DELTA_MASK) != 0;
-      this.hasOldValue = (bits & OLD_VALUE_MASK) != 0;
-      if (this.hasOldValue) {
+      hasDelta = (bits & DELTA_MASK) != 0;
+      hasOldValue = (bits & OLD_VALUE_MASK) != 0;
+      if (hasOldValue) {
         byte b = in.readByte();
         if (b == 0) {
-          this.oldValueIsSerialized = false;
+          oldValueIsSerialized = false;
         } else if (b == 1) {
-          this.oldValueIsSerialized = true;
+          oldValueIsSerialized = true;
         } else {
           throw new IllegalStateException("expected 0 or 1");
         }
-        this.oldValue = DataSerializer.readByteArray(in);
+        oldValue = DataSerializer.readByteArray(in);
       }
       boolean hasFilterInfo = (bits & FILTER_INFO_MASK) != 0;
-      this.needsRouting = (bits & NEEDS_ROUTING_MASK) != 0;
+      needsRouting = (bits & NEEDS_ROUTING_MASK) != 0;
       if (hasFilterInfo) {
-        this.filterRouting = new FilterRoutingInfo();
-        InternalDataSerializer.invokeFromData(this.filterRouting, in);
+        filterRouting = new FilterRoutingInfo();
+        InternalDataSerializer.invokeFromData(filterRouting, in);
       }
       if ((bits & VERSION_TAG_MASK) != 0) {
         boolean persistentTag = (bits & PERSISTENT_TAG_MASK) != 0;
-        this.versionTag = VersionTag.create(persistentTag, in);
+        versionTag = VersionTag.create(persistentTag, in);
       }
       if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) {
-        this.inhibitAllNotifications = true;
+        inhibitAllNotifications = true;
       }
     }
 
@@ -1423,68 +1405,68 @@ public abstract class DistributedCacheOperation {
       extendedBits = computeCompressedExtBits(extendedBits);
       out.writeShort(bits);
       out.writeShort(extendedBits);
-      if (this.processorId > 0) {
-        out.writeInt(this.processorId);
+      if (processorId > 0) {
+        out.writeInt(processorId);
       }
 
-      DataSerializer.writeString(this.regionPath, out);
-      out.writeByte(this.op.ordinal);
-      if (this.callbackArg != null) {
-        DataSerializer.writeObject(this.callbackArg, out);
+      DataSerializer.writeString(regionPath, out);
+      out.writeByte(op.ordinal);
+      if (callbackArg != null) {
+        DataSerializer.writeObject(callbackArg, out);
       }
-      if (this.hasOldValue) {
-        out.writeByte(this.oldValueIsSerialized ? 1 : 0);
+      if (hasOldValue) {
+        out.writeByte(oldValueIsSerialized ? 1 : 0);
         // the receiving side expects that the old value will have been serialized
         // as a byte array
-        final byte policy = valueIsToDeserializationPolicy(this.oldValueIsSerialized);
+        final byte policy = valueIsToDeserializationPolicy(oldValueIsSerialized);
         final Object vObj;
         final byte[] vBytes;
-        if (!this.oldValueIsSerialized && this.oldValue instanceof byte[]) {
+        if (!oldValueIsSerialized && oldValue instanceof byte[]) {
           vObj = null;
-          vBytes = (byte[]) this.oldValue;
+          vBytes = (byte[]) oldValue;
         } else {
-          vObj = this.oldValue;
+          vObj = oldValue;
           vBytes = null;
         }
         writeValue(policy, vObj, vBytes, out);
       }
-      if (this.filterRouting != null) {
-        InternalDataSerializer.invokeToData(this.filterRouting, out);
+      if (filterRouting != null) {
+        InternalDataSerializer.invokeToData(filterRouting, out);
       }
-      if (this.versionTag != null) {
-        InternalDataSerializer.invokeToData(this.versionTag, out);
+      if (versionTag != null) {
+        InternalDataSerializer.invokeToData(versionTag, out);
       }
     }
 
     protected short computeCompressedShort(short bits) {
-      if (this.hasOldValue) {
+      if (hasOldValue) {
         bits |= OLD_VALUE_MASK;
       }
-      if (this.directAck) {
+      if (directAck) {
         bits |= DIRECT_ACK_MASK;
       }
-      if (this.possibleDuplicate) {
+      if (possibleDuplicate) {
         bits |= POSSIBLE_DUPLICATE_MASK;
       }
-      if (this.processorId != 0) {
+      if (processorId != 0) {
         bits |= HAS_PROCESSOR_ID;
       }
-      if (this.callbackArg != null) {
+      if (callbackArg != null) {
         bits |= CALLBACK_ARG_MASK;
       }
-      if (this.hasDelta) {
+      if (hasDelta) {
         bits |= DELTA_MASK;
       }
-      if (this.filterRouting != null) {
+      if (filterRouting != null) {
         bits |= FILTER_INFO_MASK;
       }
-      if (this.needsRouting) {
+      if (needsRouting) {
         bits |= NEEDS_ROUTING_MASK;
       }
-      if (this.versionTag != null) {
+      if (versionTag != null) {
         bits |= VERSION_TAG_MASK;
       }
-      if (this.versionTag instanceof DiskVersionTag) {
+      if (versionTag instanceof DiskVersionTag) {
         bits |= PERSISTENT_TAG_MASK;
       }
       if (inhibitAllNotifications) {
@@ -1502,14 +1484,14 @@ public abstract class DistributedCacheOperation {
 
     protected void setFlags(short bits, DataInput in) throws IOException, ClassNotFoundException {
       if ((bits & HAS_PROCESSOR_ID) != 0) {
-        this.processorId = in.readInt();
-        ReplyProcessor21.setMessageRPId(this.processorId);
+        processorId = in.readInt();
+        ReplyProcessor21.setMessageRPId(processorId);
       }
     }
 
     @Override
     public boolean supportsDirectAck() {
-      return this.directAck;
+      return directAck;
     }
 
     public void setSendDelta(boolean sendDelta) {
@@ -1533,16 +1515,16 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void importOldObject(Object ov, boolean isSerialized) {
-      this.oldValueIsSerialized = isSerialized;
-      this.oldValue = ov;
-      this.hasOldValue = true;
+      oldValueIsSerialized = isSerialized;
+      oldValue = ov;
+      hasOldValue = true;
     }
 
     @Override
     public void importOldBytes(byte[] ov, boolean isSerialized) {
-      this.oldValueIsSerialized = isSerialized;
-      this.oldValue = ov;
-      this.hasOldValue = true;
+      oldValueIsSerialized = isSerialized;
+      oldValue = ov;
+      hasOldValue = true;
     }
 
     protected boolean notifiesSerialGatewaySender(ClusterDistributionManager dm) {
@@ -1564,26 +1546,22 @@ public abstract class DistributedCacheOperation {
   /** Custom subclass that keeps all ReplyExceptions */
   private static class ReliableCacheReplyProcessor extends CacheOperationReplyProcessor {
 
-    private final Set failedMembers;
+    private final Set<InternalDistributedMember> failedMembers;
 
-    private final DistributionManager dm;
-
-    public ReliableCacheReplyProcessor(InternalDistributedSystem system, Collection initMembers,
-        Set departedMembers) {
+    public ReliableCacheReplyProcessor(InternalDistributedSystem system,
+        Collection<InternalDistributedMember> initMembers,
+        Set<InternalDistributedMember> departedMembers) {
       super(system, initMembers);
-      this.dm = system.getDistributionManager();
-      this.failedMembers = departedMembers;
+      failedMembers = departedMembers;
     }
 
     @Override
     protected synchronized void processException(DistributionMessage dmsg, ReplyException ex) {
       Throwable cause = ex.getCause();
       // only interested in CacheClosedException and RegionDestroyedException
-      if (cause instanceof CancelException || cause instanceof RegionDestroyedException) {
-        this.failedMembers.add(dmsg.getSender());
-      } else {
+      failedMembers.add(dmsg.getSender());
+      if (!(cause instanceof CancelException) && !(cause instanceof RegionDestroyedException)) {
         // allow superclass to handle all other exceptions
-        this.failedMembers.add(dmsg.getSender());
         super.processException(dmsg, ex);
       }
     }
@@ -1594,7 +1572,7 @@ public abstract class DistributedCacheOperation {
         if (logger.isDebugEnabled()) {
           logger.debug("{} replied with ignored true", dmsg.getSender());
         }
-        this.failedMembers.add(dmsg.getSender());
+        failedMembers.add(dmsg.getSender());
       }
       super.process(dmsg, warn);
     }
@@ -1605,7 +1583,8 @@ public abstract class DistributedCacheOperation {
 
     public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
 
-    public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
+    public CacheOperationReplyProcessor(InternalDistributedSystem system,
+        Collection<InternalDistributedMember> initMembers) {
       super(system, initMembers);
     }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
index 606fac6..ff4bb2a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
@@ -41,13 +41,14 @@ public class DistributedCacheOperationTest {
   @Test
   public void shouldBeMockable() throws Exception {
     DistributedCacheOperation mockDistributedCacheOperation = mock(DistributedCacheOperation.class);
+    @SuppressWarnings("unused") // forces CacheOperationMessage to be mockable
     CacheOperationMessage mockCacheOperationMessage = mock(CacheOperationMessage.class);
     Map<InternalDistributedMember, PersistentMemberID> persistentIds = new HashMap<>();
     when(mockDistributedCacheOperation.supportsDirectAck()).thenReturn(false);
 
-    mockDistributedCacheOperation.waitForAckIfNeeded(mockCacheOperationMessage, persistentIds);
+    mockDistributedCacheOperation.waitForAckIfNeeded(persistentIds);
 
-    verify(mockDistributedCacheOperation, times(1)).waitForAckIfNeeded(mockCacheOperationMessage,
+    verify(mockDistributedCacheOperation, times(1)).waitForAckIfNeeded(
         persistentIds);
 
     assertThat(mockDistributedCacheOperation.supportsDirectAck()).isFalse();
@@ -80,7 +81,7 @@ public class DistributedCacheOperationTest {
     boolean endOperationInvoked;
     DistributedRegion region;
 
-    public TestOperation(CacheEvent event) {
+    public TestOperation(CacheEvent<?, ?> event) {
       super(event);
     }
 
@@ -114,7 +115,7 @@ public class DistributedCacheOperationTest {
   @Test
   public void testDoRemoveDestroyTokensFromCqResultKeys() {
     Object key = new Object();
-    HashMap hashMap = new HashMap();
+    HashMap<Long, Integer> hashMap = new HashMap<>();
     hashMap.put(1L, MessageType.LOCAL_DESTROY);
     EntryEventImpl baseEvent = mock(EntryEventImpl.class);
     ServerCQ serverCQ = mock(ServerCQ.class);
@@ -123,7 +124,7 @@ public class DistributedCacheOperationTest {
         new DestroyOperation(baseEvent);
     when(baseEvent.getKey()).thenReturn(key);
     when(filterInfo.getCQs()).thenReturn(hashMap);
-    when(serverCQ.getFilterID()).thenReturn(new Long(1L));
+    when(serverCQ.getFilterID()).thenReturn(1L);
     doNothing().when(serverCQ).removeFromCqResultKeys(isA(Object.class), isA(Boolean.class));
 
     distributedCacheOperation.doRemoveDestroyTokensFromCqResultKeys(filterInfo, serverCQ);