You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/04 00:07:33 UTC

[19/54] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache

http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 172fabe..5f8d238 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -49,13 +49,13 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    * {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data node)
    */
   protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals =
-      new HashMap<>();
+    new HashMap<>();
   private HashMap<LocalRegion, DistributedMember> rrTargets;
   private Set<DistributedMember> txRemoteParticpants = null; // other than local
   protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
 
   public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
-      InternalDistributedMember clientMember) {
+                                           InternalDistributedMember clientMember) {
     super(managerImpl, id, clientMember);
   }
 
@@ -65,9 +65,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see org.apache.geode.internal.cache.TXStateInterface#commit()
-   * 
+   *
    * [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach
    * all
    */
@@ -79,12 +79,12 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       // create a map of secondary(for PR) / replica(for RR) to stubs to send
       // commit message to those
       HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals =
-          getSecondariesAndReplicasForTxOps();
+        getSecondariesAndReplicasForTxOps();
       // add it to the existing map and then send commit to all copies
       target2realDeals.putAll(otherTargets2realDeals);
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
+          "DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
       }
 
       precommitResult = doPrecommit();
@@ -95,17 +95,17 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
         boolean phase2commitDone = doCommit();
         if (logger.isDebugEnabled()) {
           logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit "
-              + (phase2commitDone ? "Done" : "Failed"));
+                       + (phase2commitDone ? "Done" : "Failed"));
         }
         // [DISTTX] TODO Handle this exception well
         if (!phase2commitDone) {
           throw new TransactionInDoubtException(
-              LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
+            LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
         }
       } else {
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
+            "DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
         }
       }
     } catch (CommitConflictException e) {
@@ -133,30 +133,30 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    */
   private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
     final GemFireCacheImpl cache =
-        GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
+      GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
     InternalDistributedMember currentNode =
-        cache.getInternalDistributedSystem().getDistributedMember();
+      cache.getInternalDistributedSystem().getDistributedMember();
 
     HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals =
-        new HashMap<>();
+      new HashMap<>();
     for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals.entrySet()) {
       DistributedMember originalTarget = e.getKey();
       DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
 
       ArrayList<DistTxEntryEvent> primaryTxOps =
-          distPeerTxStateStub.getPrimaryTransactionalOperations();
+        distPeerTxStateStub.getPrimaryTransactionalOperations();
       for (DistTxEntryEvent dtop : primaryTxOps) {
         LocalRegion lr = dtop.getRegion();
         // replicas or secondaries
         Set<InternalDistributedMember> otherNodes = null;
         if (lr instanceof PartitionedRegion) {
           Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop.getRegion())
-              .getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
+            .getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
           allNodes.remove(originalTarget);
           otherNodes = allNodes;
         } else if (lr instanceof DistributedRegion) {
           otherNodes =
-              ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
+            ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
           otherNodes.remove(originalTarget);
         }
 
@@ -204,20 +204,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
       // create processor and rollback message
       DistTXRollbackMessage.DistTxRollbackReplyProcessor processor =
-          new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
-              txRemoteParticpants, target2realDeals);
+        new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
+          txRemoteParticpants, target2realDeals);
       // TODO [DISTTX} whats ack threshold?
       processor.enableSevereAlertProcessing();
       final DistTXRollbackMessage rollbackMsg =
-          new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+        new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
 
       // send rollback message to remote nodes
       for (DistributedMember remoteNode : txRemoteParticpants) {
         DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
         if (remoteTXStateStub.isTxState()) {
           throw new UnsupportedOperationInTransactionException(
-              LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
-                  remoteTXStateStub.getClass().getSimpleName()));
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+              remoteTXStateStub.getClass().getSimpleName()));
         }
         try {
           remoteTXStateStub.setRollbackMessage(rollbackMsg, dm);
@@ -236,14 +236,14 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       if (localTXState != null) {
         if (!localTXState.isTxState()) {
           throw new UnsupportedOperationInTransactionException(
-              LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
-                  localTXState.getClass().getSimpleName()));
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+              localTXState.getClass().getSimpleName()));
         }
         localTXState.rollback();
         boolean localResult = localTXState.getRollbackResponse();
         if (logger.isDebugEnabled()) {
           logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = " + dm.getId()
-              + " ,result= " + localResult + " ,finalResult-old= " + finalResult);
+                       + " ,result= " + localResult + " ,finalResult-old= " + finalResult);
         }
         finalResult = finalResult && localResult;
       }
@@ -280,7 +280,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
           Boolean remoteResult = e.getValue();
           if (logger.isDebugEnabled()) { // TODO - make this trace level
             logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " + target
-                + " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
+                         + " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
           }
           finalResult = finalResult && remoteResult;
         }
@@ -295,7 +295,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     /*
      * [DISTTX] TODO Write similar method to take out exception
-     * 
+     *
      * [DISTTX] TODO Handle Reliable regions
      */
     // if (this.hasReliableRegions) {
@@ -339,8 +339,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
           if (r.scope.isDistributed()) {
             if (txDistributedClientWarningIssued.compareAndSet(false, true)) {
               logger.warn(LocalizedMessage.create(
-                  LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
-                  r.getFullPath()));
+                LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
+                r.getFullPath()));
             }
           }
         } else {
@@ -354,20 +354,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
-            this.realDeal, this.txMgr.getDM().getId(), this, target, new Throwable());
+          "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
+          this.realDeal, this.txMgr.getDM().getId(), this, target, new Throwable());
       }
       target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
       if (logger.isDebugEnabled()) {
         logger
-            .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
-                + target2realDeals);
+          .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
+                 + target2realDeals);
       }
     } else {
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
-            this.realDeal, this, target, target2realDeals);
+          "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
+          this.realDeal, this, target, target2realDeals);
       }
     }
     return this.realDeal;
@@ -382,25 +382,25 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       this.realDeal = new DistPeerTXStateStub(this, target, onBehalfOfClientMember);
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
-            this.realDeal, this.txMgr.getDM().getId());
+          "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
+          this.realDeal, this.txMgr.getDM().getId());
       }
       if (!this.realDeal.isDistTx() || this.realDeal.isCreatedOnDistTxCoordinator()
           || !this.realDeal.isTxState()) {
         throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
-            .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
+          .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
       }
       target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
-                + target2realDeals);
+          "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
+          + target2realDeals);
       }
     } else {
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
-            this.realDeal, this, target, target2realDeals);
+          "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
+          this.realDeal, this, target, target2realDeals);
       }
     }
     return this.realDeal;
@@ -429,8 +429,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       this.txRemoteParticpants.remove(dm.getId());
       if (logger.isDebugEnabled()) {
         logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = "
-            + txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
-            + dm.getId());
+                     + txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
+                     + dm.getId());
       }
     }
     return txRemoteParticpants;
@@ -444,20 +444,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     // create processor and precommit message
     DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor =
-        new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
-            txRemoteParticpants, target2realDeals);
+      new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
+        txRemoteParticpants, target2realDeals);
     // TODO [DISTTX} whats ack threshold?
     processor.enableSevereAlertProcessing();
     final DistTXPrecommitMessage precommitMsg =
-        new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+      new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
 
     // send precommit message to remote nodes
     for (DistributedMember remoteNode : txRemoteParticpants) {
       DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
       if (remoteTXStateStub.isTxState()) {
         throw new UnsupportedOperationInTransactionException(
-            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
-                remoteTXStateStub.getClass().getSimpleName()));
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+            remoteTXStateStub.getClass().getSimpleName()));
       }
       try {
         remoteTXStateStub.setPrecommitMessage(precommitMsg, dm);
@@ -467,7 +467,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
+          "DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
       }
     }
 
@@ -477,29 +477,29 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
     if (localTXState != null) {
       if (!localTXState.isTxState()) {
         throw new UnsupportedOperationInTransactionException(
-            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
-                localTXState.getClass().getSimpleName()));
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+            localTXState.getClass().getSimpleName()));
       }
       localTXState.precommit();
       boolean localResult = localTXState.getPreCommitResponse();
       TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap =
-          new TreeMap<String, ArrayList<DistTxThinEntryState>>();
+        new TreeMap<String, ArrayList<DistTxThinEntryState>>();
       ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
       if (localResult) {
         localResult = ((DistTXStateOnCoordinator) localTXState)
-            .populateDistTxEntryStateList(entryStateSortedMap);
+          .populateDistTxEntryStateList(entryStateSortedMap);
         if (localResult) {
           entryEventList =
-              new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
+            new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
           populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName);
         }
       }
 
       if (logger.isDebugEnabled()) {
         logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = " + dm.getId()
-            + " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
-            + printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
-            + " ,finalResult-old= " + finalResult);
+                     + " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
+                     + printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
+                     + " ,finalResult-old= " + finalResult);
       }
       finalResult = finalResult && localResult;
     }
@@ -531,19 +531,19 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       // dm.getStats().incCommitWaits();
 
       Map<DistributedMember, DistTxPrecommitResponse> remoteResults =
-          processor.getCommitResponseMap();
+        processor.getCommitResponseMap();
       for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults.entrySet()) {
         DistributedMember target = e.getKey();
         DistTxPrecommitResponse remoteResponse = e.getValue();
         ArrayList<ArrayList<DistTxThinEntryState>> entryEventList =
-            remoteResponse.getDistTxEntryEventList();
+          remoteResponse.getDistTxEntryEventList();
         populateEntryEventMap(target, entryEventList, sortedRegionName);
         if (logger.isDebugEnabled()) {
           logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = "
-              + target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
-              + printEntryEventList(entryEventList) + " ,txEntryEventMap="
-              + printEntryEventMap(this.txEntryEventMap) + " ,result= "
-              + remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
+                       + target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
+                       + printEntryEventList(entryEventList) + " ,txEntryEventMap="
+                       + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+                       + remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
         }
         finalResult = finalResult && remoteResponse.getCommitState();
       }
@@ -551,7 +551,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     /*
      * [DISTTX] TODO Write similar method to take out exception
-     * 
+     *
      * [DISTTX] TODO Handle Reliable regions
      */
     // if (this.hasReliableRegions) {
@@ -566,11 +566,11 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
   /*
    * Handle response of precommit reply
-   * 
+   *
    * Go over list of region versions for this target and fill map
    */
   private void populateEntryEventMap(DistributedMember target,
-      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
+                                     ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
     if (this.txEntryEventMap == null) {
       this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>();
     }
@@ -583,10 +583,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
       if (sortedRegionName.size() != entryEventList.size()) {
         throw new UnsupportedOperationInTransactionException(
-            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
-                "size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
-                    + " for target=" + target,
-                entryEventList.size() + " {" + entryEventList + "}"));
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+            "size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
+            + " for target=" + target,
+            entryEventList.size() + " {" + entryEventList + "}"));
       }
 
       int index = 0;
@@ -601,7 +601,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    * Populate list of regions for this target, while sending commit messages
    */
   private void populateEntryEventList(DistributedMember target,
-      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
+                                      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
     DistTXCoordinatorInterface distTxItem = target2realDeals.get(target);
     sortedRegionMap.clear();
     distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true);
@@ -612,7 +612,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap.get(rName);
       if (entryStates == null) {
         throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
-            .toLocalizedString("entryStates for " + rName + " at target " + target, "null"));
+          .toLocalizedString("entryStates for " + rName + " at target " + target, "null"));
       }
       entryEventList.add(entryStates);
     }
@@ -631,12 +631,12 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     // create processor and commit message
     DistTXCommitMessage.DistTxCommitReplyProcessor processor =
-        new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
-            target2realDeals);
+      new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
+        target2realDeals);
     // TODO [DISTTX} whats ack threshold?
     processor.enableSevereAlertProcessing();
     final DistTXCommitMessage commitMsg =
-        new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+      new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
 
     // send commit message to remote nodes
     ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>();
@@ -645,8 +645,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
       if (remoteTXStateStub.isTxState()) {
         throw new UnsupportedOperationInTransactionException(
-            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
-                remoteTXStateStub.getClass().getSimpleName()));
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+            remoteTXStateStub.getClass().getSimpleName()));
       }
       try {
         populateEntryEventList(remoteNode, entryEventList, sortedRegionName);
@@ -659,9 +659,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       }
       if (logger.isDebugEnabled()) {
         logger.debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = "
-            + remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
-            + printEntryEventList(entryEventList) + " ,txEntryEventMap="
-            + printEntryEventMap(this.txEntryEventMap));
+                     + remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
+                     + printEntryEventList(entryEventList) + " ,txEntryEventMap="
+                     + printEntryEventMap(this.txEntryEventMap));
       }
     }
 
@@ -670,8 +670,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
     if (localTXState != null) {
       if (!localTXState.isTxState()) {
         throw new UnsupportedOperationInTransactionException(
-            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
-                localTXState.getClass().getSimpleName()));
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+            localTXState.getClass().getSimpleName()));
       }
       populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
       ((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
@@ -679,10 +679,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       TXCommitMessage localResultMsg = localTXState.getCommitMessage();
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
-                + sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
-                + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
-                + (localResultMsg != null) + " ,finalResult-old= " + finalResult);
+          "DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
+          + sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
+          + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+          + (localResultMsg != null) + " ,finalResult-old= " + finalResult);
       }
       finalResult = finalResult && (localResultMsg != null);
     }
@@ -719,8 +719,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
         TXCommitMessage remoteResultMsg = e.getValue();
         if (logger.isDebugEnabled()) { // TODO - make this trace level
           logger.debug(
-              "DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
-                  + " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
+            "DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
+            + " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
         }
         finalResult = finalResult && remoteResultMsg != null;
       }
@@ -728,7 +728,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     /*
      * [DISTTX] TODO Write similar method to take out exception
-     * 
+     *
      * [DISTTX] TODO Handle Reliable regions
      */
     // if (this.hasReliableRegions) {
@@ -748,7 +748,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    */
   @Override
   public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
-      LocalRegion region) {
+                         LocalRegion region) {
     if (putallOp.putAllData.length == 0) {
       return;
     }
@@ -756,22 +756,22 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       super.postPutAll(putallOp, successfulPuts, region);
     } else {
       region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
-                                                               // #43651
+      // #43651
 
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator.postPutAll "
-                + "processing putAll op for region {}, size of putAllOp " + "is {}",
-            region, putallOp.putAllData.length);
+          "DistTXStateProxyImplOnCoordinator.postPutAll "
+          + "processing putAll op for region {}, size of putAllOp " + "is {}",
+          region, putallOp.putAllData.length);
       }
 
 
       // map of bucketId to putall op for this bucket
       HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap =
-          new HashMap<Integer, DistributedPutAllOperation>();
+        new HashMap<Integer, DistributedPutAllOperation>();
       // map of bucketId to TXStateStub for target that hosts this bucket
       HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
-          new HashMap<Integer, DistTXCoordinatorInterface>();
+        new HashMap<Integer, DistTXCoordinatorInterface>();
 
       // separate the putall op per bucket
       for (int i = 0; i < putallOp.putAllData.length; i++) {
@@ -783,10 +783,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
         if (putAllForBucket == null) {
           // TODO DISTTX: event is never released
           EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
-              Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue());
+            Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue());
           event.setEventId(putallOp.putAllData[i].getEventID());
           putAllForBucket =
-              new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
+            new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
           bucketToPutallMap.put(bucketId, putAllForBucket);
         }
         putallOp.putAllData[i].setFakeEventID();
@@ -811,9 +811,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "DistTXStateProxyImplOnCoordinator.postPutAll processing"
-                  + " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
-              bucketId, dtsi, putAllForBucket);
+            "DistTXStateProxyImplOnCoordinator.postPutAll processing"
+            + " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
+            bucketId, dtsi, putAllForBucket);
         }
         dtsi.postPutAll(putAllForBucket, successfulPuts, region);
       }
@@ -827,7 +827,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
    */
   @Override
   public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
-      LocalRegion region) {
+                            LocalRegion region) {
     if (op.removeAllData.length == 0) {
       return;
     }
@@ -835,20 +835,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       super.postRemoveAll(op, successfulOps, region);
     } else {
       region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
-                                                               // #43651
+      // #43651
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "DistTXStateProxyImplOnCoordinator.postRemoveAll "
-                + "processing removeAll op for region {}, size of removeAll " + "is {}",
-            region, op.removeAllDataSize);
+          "DistTXStateProxyImplOnCoordinator.postRemoveAll "
+          + "processing removeAll op for region {}, size of removeAll " + "is {}",
+          region, op.removeAllDataSize);
       }
 
       // map of bucketId to removeAll op for this bucket
       HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap =
-          new HashMap<Integer, DistributedRemoveAllOperation>();
+        new HashMap<Integer, DistributedRemoveAllOperation>();
       // map of bucketId to TXStateStub for target that hosts this bucket
       HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
-          new HashMap<Integer, DistTXCoordinatorInterface>();
+        new HashMap<Integer, DistTXCoordinatorInterface>();
 
       // separate the removeAll op per bucket
       for (int i = 0; i < op.removeAllData.length; i++) {
@@ -862,7 +862,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
           EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, region, key);
           event.setEventId(op.removeAllData[i].getEventID());
           removeAllForBucket =
-              new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
+            new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
           bucketToRemoveAllMap.put(bucketId, removeAllForBucket);
         }
         op.removeAllData[i].setFakeEventID();
@@ -887,9 +887,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
-                  + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
-              bucketId, dtsi, removeAllForBucket);
+            "DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
+            + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
+            bucketId, dtsi, removeAllForBucket);
         }
         dtsi.postRemoveAll(removeAllForBucket, successfulOps, region);
       }
@@ -903,13 +903,13 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
   }
 
   public static String printEntryEventMap(
-      HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
+    HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
     StringBuilder str = new StringBuilder();
     str.append(" (");
     str.append(txRegionVersionsMap.size());
     str.append(")=[ ");
     for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap
-        .entrySet()) {
+      .entrySet()) {
       str.append(" {").append(entry.getKey());
       str.append(":").append("size(").append(entry.getValue().size()).append(")");
       str.append("=").append(entry.getValue()).append("}, ");
@@ -919,7 +919,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
   }
 
   public static String printEntryEventList(
-      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
+    ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
     StringBuilder str = new StringBuilder();
     str.append(" (");
     str.append(entryEventList.size());

http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0a9ccd8..7ba7d0c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import java.io.DataInput;
@@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.query.internal.cq.CqService;
@@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.CopyOnWriteHashSet;
 import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage;
 import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.Releasable;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.util.DelayedAction;
 
-/**
- * 
- */
 public abstract class DistributedCacheOperation {
 
   private static final Logger logger = LogService.getLogger();
 
   public static double LOSS_SIMULATION_RATIO = 0; // test hook
+
   public static Random LOSS_SIMULATION_GENERATOR;
 
   public static long SLOW_DISTRIBUTION_MS = 0; // test hook
 
   // constants used in subclasses and distribution messages
   // should use enum in source level 1.5+
+
   /**
    * Deserialization policy: do not deserialize (for byte array, null or cases where the value
    * should stay serialized)
@@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation {
   }
 
 
-  public final static byte DESERIALIZATION_POLICY_NUMBITS =
+  public static final byte DESERIALIZATION_POLICY_NUMBITS =
       DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
 
   public static final short DESERIALIZATION_POLICY_END =
       (short) (1 << DESERIALIZATION_POLICY_NUMBITS);
+
   public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1);
 
   public static boolean testSendingOldValues;
@@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation {
     try {
       _distribute();
     } catch (InvalidVersionException e) {
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
         logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
       }
 
@@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation {
     DistributedRegion region = getRegion();
     if (viewVersion != -1) {
       region.getDistributionAdvisor().endOperation(viewVersion);
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
         logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
             viewVersion);
       }
@@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation {
     if (SLOW_DISTRIBUTION_MS > 0) { // test hook
       try {
         Thread.sleep(SLOW_DISTRIBUTION_MS);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
       }
       SLOW_DISTRIBUTION_MS = 0;
@@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation {
       }
 
       // some members requiring old value are also in the cache op recipients set
-      Set needsOldValueInCacheOp = Collections.EMPTY_SET;
+      Set needsOldValueInCacheOp = Collections.emptySet();
 
       // set client routing information into the event
       boolean routingComputed = false;
       FilterRoutingInfo filterRouting = null;
       // recipients that will get a cacheop msg and also a PR message
-      Set twoMessages = Collections.EMPTY_SET;
+      Set twoMessages = Collections.emptySet();
       if (region.isUsedForPartitionedRegionBucket()) {
-        twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages();
+        twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
         routingComputed = true;
         filterRouting = getRecipientFilterRouting(recipients);
         if (filterRouting != null) {
@@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation {
 
       // some members need PR notification of the change for client/wan
       // notification
-      Set adjunctRecipients = Collections.EMPTY_SET;
+      Set adjunctRecipients = Collections.emptySet();
 
       // Partitioned region listener notification messages piggyback on this
       // operation's replyprocessor and need to be sent at the same time as
@@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation {
         recipients.removeAll(needsOldValueInCacheOp);
       }
 
-      Set cachelessNodes = Collections.EMPTY_SET;
-      Set adviseCacheServers = Collections.EMPTY_SET;
-      Set<InternalDistributedMember> cachelessNodesWithNoCacheServer =
-          new HashSet<InternalDistributedMember>();
+      Set cachelessNodes = Collections.emptySet();
+      Set adviseCacheServers;
+      Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>();
       if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
         cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
         if (!cachelessNodes.isEmpty()) {
           List list = new ArrayList(cachelessNodes);
           for (Object member : cachelessNodes) {
-            if (!recipients.contains(member)) {
+            if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
               // Don't include those originally excluded.
               list.remove(member);
-            } else if (adjunctRecipients.contains(member)) {
-              list.remove(member);
             }
           }
           cachelessNodes.clear();
@@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation {
         if (!reliableOp || region.isNoDistributionOk()) {
           // nothing needs be done in this case
         } else {
-          region.handleReliableDistribution(Collections.EMPTY_SET);
+          region.handleReliableDistribution(Collections.emptySet());
         }
 
-        /** compute local client routing before waiting for an ack only for a bucket */
+        // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
           this.event.setLocalFilterInfo(filterInfo);
@@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation {
       } else {
         boolean directAck = false;
         boolean useMulticast = region.getMulticastEnabled()
-            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();;
+            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
         boolean shouldAck = shouldAck();
 
         if (shouldAck) {
@@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation {
                     recipients);
               }
               waitForMembers.removeAll(recipients);
-              recipients = Collections.EMPTY_SET;
+              recipients = Collections.emptySet();
             }
           }
           if (reliableOp) {
@@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation {
           }
 
           adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients);
-          adviseCacheServers = ((BucketRegion) region).getPartitionedRegion()
+          adviseCacheServers = ((Bucket) region).getPartitionedRegion()
               .getCacheDistributionAdvisor().adviseCacheServers();
           adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
 
@@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation {
           }
         }
 
-        /** compute local client routing before waiting for an ack only for a bucket */
+        // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
           event.setLocalFilterInfo(filterInfo);
@@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-
   /**
    * Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results
    * key caching. the destroy event keys are marked as destroyed instead of removing them, this is
@@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation {
         continue;
       }
 
-      CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion()
+      CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion()
           .getCacheDistributionAdvisor().getProfile(m);
 
       if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile()
@@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation {
         continue;
       }
 
-
       for (Object value : cf.filterProfile.getCqMap().values()) {
         ServerCQ cq = (ServerCQ) value;
 
@@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation {
           Long cqID = e.getKey();
           // For the CQs satisfying the event with destroy CQEvent, remove
           // the entry form CQ cache.
-          if (cq.getFilterID() == cqID
-              && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) {
-            cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true);
+          if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
+            cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
           }
         }
       }
     }
   }
 
-
   /**
    * Get the adjunct receivers for a partitioned region operation
    * 
@@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation {
 
   /**
    * perform any operation-specific initialization on the given reply processor
-   * 
-   * @param p
-   * @param msg
    */
   protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
     // nothing to do here - see UpdateMessage
@@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-  /**
-   * @param closedMembers
-   */
   private void handleClosedMembers(Set<InternalDistributedMember> closedMembers,
       Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
     if (persistentIds == null) {
@@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation {
       return null;
     }
     CacheDistributionAdvisor advisor;
-    // if (region.isUsedForPartitionedRegionBucket()) {
-    advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
-    // } else {
-    // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor();
-    // }
+    advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
     return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
   }
 
@@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation {
     protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1);
     protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1);
 
-
     private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
 
     public boolean needsRouting;
@@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation {
       return this.op;
     }
 
-
     /** sets the concurrency versioning tag for this message */
     public void setVersionTag(VersionTag tag) {
       this.versionTag = tag;
@@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation {
     /**
      * process a reply
      * 
-     * @param reply
-     * @param processor
      * @return true if the reply-processor should continue to process this response
      */
     boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) {
@@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation {
      * @param event the entry event that contains the old value
      */
     public void appendOldValueToMessage(EntryEventImpl event) {
-      {
-        @Unretained
-        Object val = event.getRawOldValue();
-        if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
-            || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
-          return;
-        }
+      @Unretained
+      Object val = event.getRawOldValue();
+      if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
+          || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
+        return;
       }
       event.exportOldValue(this);
     }
@@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation {
 
     protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
       Assert.assertTrue(this.regionPath != null, "regionPath was null");
-      GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+      InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
       return gfc.getRegionByPathForProcessing(this.regionPath);
     }
 
@@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation {
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
         basicProcess(dm, lclRgn);
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
@@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation {
             // region
             if (!rgn.isEventTrackerInitialized()
                 && (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) {
-              if (logger.isDebugEnabled()) {
+              if (logger.isTraceEnabled()) {
                 logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event");
               }
               return;
@@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation {
           sendReply = operateOnRegion(event, dm) && sendReply;
         } finally {
           if (event instanceof EntryEventImpl) {
-            ((EntryEventImpl) event).release();
+            ((Releasable) event).release();
           }
         }
-      } catch (RegionDestroyedException e) {
+      } catch (RegionDestroyedException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Region destroyed: nothing to do", this);
         }
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
@@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation {
         if (!lclRgn.isDestroyed()) {
           logger.error("Got disk access exception, expected region to be destroyed", e);
         }
-      } catch (EntryNotFoundException e) {
+      } catch (EntryNotFoundException ignore) {
         this.appliedOperation = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Entry not found, nothing to do", this);
@@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation {
       if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871
         // distributed-no-ack message. Don't respond
       } else {
-        ReplyException exception = rex;
-        ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false,
+        ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
             isInternal());
       }
     }
@@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation {
      * When an event is discarded because of an attempt to overwrite a more recent change we still
      * need to deliver that event to clients. Clients can then perform their own concurrency checks
      * on the event.
-     * 
-     * @param rgn
-     * @param ev
      */
     protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
       if (logger.isDebugEnabled()) {
@@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation {
       rgn.notifyBridgeClients(ev);
     }
 
-    // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys,
-    // String path) {
-    // return LocalRegion.getRegionFromPath(sys, path);
-    // }
-
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
         throws EntryNotFoundException;
 
@@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-      // super.fromData(in);
       short bits = in.readShort();
       short extBits = in.readShort();
       this.flags = bits;
@@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void toData(DataOutput out) throws IOException {
-      // super.toData(out);
-
       short bits = 0;
       short extendedBits = 0;
       bits = computeCompressedShort(bits);
@@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation {
   static class CacheOperationReplyProcessor extends DirectReplyProcessor {
     public CacheOperationMessage msg;
 
-    public CopyOnWriteHashSet<InternalDistributedMember> closedMembers =
-        new CopyOnWriteHashSet<InternalDistributedMember>();
+    public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
 
     public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
       super(system, initMembers);