You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/09 21:09:19 UTC

[31/50] geode git commit: GEODE-2632: change dependencies on GemFireCacheImpl to InternalCache

http://git-wip-us.apache.org/repos/asf/geode/blob/d888c75e/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 172fabe..aa40508 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -65,9 +65,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see org.apache.geode.internal.cache.TXStateInterface#commit()
-   * 
+   *
    * [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach
    * all
    */
@@ -295,7 +295,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     /*
      * [DISTTX] TODO Write similar method to take out exception
-     * 
+     *
      * [DISTTX] TODO Handle Reliable regions
      */
     // if (this.hasReliableRegions) {
@@ -551,7 +551,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     /*
      * [DISTTX] TODO Write similar method to take out exception
-     * 
+     *
      * [DISTTX] TODO Handle Reliable regions
      */
     // if (this.hasReliableRegions) {
@@ -566,7 +566,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
   /*
    * Handle response of precommit reply
-   * 
+   *
    * Go over list of region versions for this target and fill map
    */
   private void populateEntryEventMap(DistributedMember target,
@@ -728,7 +728,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     /*
      * [DISTTX] TODO Write similar method to take out exception
-     * 
+     *
      * [DISTTX] TODO Handle Reliable regions
      */
     // if (this.hasReliableRegions) {
@@ -756,7 +756,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       super.postPutAll(putallOp, successfulPuts, region);
     } else {
       region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
-                                                               // #43651
+      // #43651
 
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -835,7 +835,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       super.postRemoveAll(op, successfulOps, region);
     } else {
       region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
-                                                               // #43651
+      // #43651
       if (logger.isDebugEnabled()) {
         logger.debug(
             "DistTXStateProxyImplOnCoordinator.postRemoveAll "

http://git-wip-us.apache.org/repos/asf/geode/blob/d888c75e/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0a9ccd8..7ba7d0c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import java.io.DataInput;
@@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.query.internal.cq.CqService;
@@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.CopyOnWriteHashSet;
 import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage;
 import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
 import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.Releasable;
 import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.util.DelayedAction;
 
-/**
- * 
- */
 public abstract class DistributedCacheOperation {
 
   private static final Logger logger = LogService.getLogger();
 
   public static double LOSS_SIMULATION_RATIO = 0; // test hook
+
   public static Random LOSS_SIMULATION_GENERATOR;
 
   public static long SLOW_DISTRIBUTION_MS = 0; // test hook
 
   // constants used in subclasses and distribution messages
   // should use enum in source level 1.5+
+
   /**
    * Deserialization policy: do not deserialize (for byte array, null or cases where the value
    * should stay serialized)
@@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation {
   }
 
 
-  public final static byte DESERIALIZATION_POLICY_NUMBITS =
+  public static final byte DESERIALIZATION_POLICY_NUMBITS =
       DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
 
   public static final short DESERIALIZATION_POLICY_END =
       (short) (1 << DESERIALIZATION_POLICY_NUMBITS);
+
   public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1);
 
   public static boolean testSendingOldValues;
@@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation {
     try {
       _distribute();
     } catch (InvalidVersionException e) {
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
         logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
       }
 
@@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation {
     DistributedRegion region = getRegion();
     if (viewVersion != -1) {
       region.getDistributionAdvisor().endOperation(viewVersion);
-      if (logger.isDebugEnabled()) {
+      if (logger.isTraceEnabled()) {
         logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
             viewVersion);
       }
@@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation {
     if (SLOW_DISTRIBUTION_MS > 0) { // test hook
       try {
         Thread.sleep(SLOW_DISTRIBUTION_MS);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
       }
       SLOW_DISTRIBUTION_MS = 0;
@@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation {
       }
 
       // some members requiring old value are also in the cache op recipients set
-      Set needsOldValueInCacheOp = Collections.EMPTY_SET;
+      Set needsOldValueInCacheOp = Collections.emptySet();
 
       // set client routing information into the event
       boolean routingComputed = false;
       FilterRoutingInfo filterRouting = null;
       // recipients that will get a cacheop msg and also a PR message
-      Set twoMessages = Collections.EMPTY_SET;
+      Set twoMessages = Collections.emptySet();
       if (region.isUsedForPartitionedRegionBucket()) {
-        twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages();
+        twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
         routingComputed = true;
         filterRouting = getRecipientFilterRouting(recipients);
         if (filterRouting != null) {
@@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation {
 
       // some members need PR notification of the change for client/wan
       // notification
-      Set adjunctRecipients = Collections.EMPTY_SET;
+      Set adjunctRecipients = Collections.emptySet();
 
       // Partitioned region listener notification messages piggyback on this
       // operation's replyprocessor and need to be sent at the same time as
@@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation {
         recipients.removeAll(needsOldValueInCacheOp);
       }
 
-      Set cachelessNodes = Collections.EMPTY_SET;
-      Set adviseCacheServers = Collections.EMPTY_SET;
-      Set<InternalDistributedMember> cachelessNodesWithNoCacheServer =
-          new HashSet<InternalDistributedMember>();
+      Set cachelessNodes = Collections.emptySet();
+      Set adviseCacheServers;
+      Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>();
       if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
         cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
         if (!cachelessNodes.isEmpty()) {
           List list = new ArrayList(cachelessNodes);
           for (Object member : cachelessNodes) {
-            if (!recipients.contains(member)) {
+            if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
               // Don't include those originally excluded.
               list.remove(member);
-            } else if (adjunctRecipients.contains(member)) {
-              list.remove(member);
             }
           }
           cachelessNodes.clear();
@@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation {
         if (!reliableOp || region.isNoDistributionOk()) {
           // nothing needs be done in this case
         } else {
-          region.handleReliableDistribution(Collections.EMPTY_SET);
+          region.handleReliableDistribution(Collections.emptySet());
         }
 
-        /** compute local client routing before waiting for an ack only for a bucket */
+        // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
           this.event.setLocalFilterInfo(filterInfo);
@@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation {
       } else {
         boolean directAck = false;
         boolean useMulticast = region.getMulticastEnabled()
-            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();;
+            && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
         boolean shouldAck = shouldAck();
 
         if (shouldAck) {
@@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation {
                     recipients);
               }
               waitForMembers.removeAll(recipients);
-              recipients = Collections.EMPTY_SET;
+              recipients = Collections.emptySet();
             }
           }
           if (reliableOp) {
@@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation {
           }
 
           adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients);
-          adviseCacheServers = ((BucketRegion) region).getPartitionedRegion()
+          adviseCacheServers = ((Bucket) region).getPartitionedRegion()
               .getCacheDistributionAdvisor().adviseCacheServers();
           adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
 
@@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation {
           }
         }
 
-        /** compute local client routing before waiting for an ack only for a bucket */
+        // compute local client routing before waiting for an ack only for a bucket
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
           event.setLocalFilterInfo(filterInfo);
@@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-
   /**
    * Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results
    * key caching. the destroy event keys are marked as destroyed instead of removing them, this is
@@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation {
         continue;
       }
 
-      CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion()
+      CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion()
           .getCacheDistributionAdvisor().getProfile(m);
 
       if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile()
@@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation {
         continue;
       }
 
-
       for (Object value : cf.filterProfile.getCqMap().values()) {
         ServerCQ cq = (ServerCQ) value;
 
@@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation {
           Long cqID = e.getKey();
           // For the CQs satisfying the event with destroy CQEvent, remove
           // the entry form CQ cache.
-          if (cq.getFilterID() == cqID
-              && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) {
-            cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true);
+          if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
+            cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
           }
         }
       }
     }
   }
 
-
   /**
    * Get the adjunct receivers for a partitioned region operation
    * 
@@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation {
 
   /**
    * perform any operation-specific initialization on the given reply processor
-   * 
-   * @param p
-   * @param msg
    */
   protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
     // nothing to do here - see UpdateMessage
@@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation {
     }
   }
 
-  /**
-   * @param closedMembers
-   */
   private void handleClosedMembers(Set<InternalDistributedMember> closedMembers,
       Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
     if (persistentIds == null) {
@@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation {
       return null;
     }
     CacheDistributionAdvisor advisor;
-    // if (region.isUsedForPartitionedRegionBucket()) {
-    advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
-    // } else {
-    // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor();
-    // }
+    advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
     return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
   }
 
@@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation {
     protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1);
     protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1);
 
-
     private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
 
     public boolean needsRouting;
@@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation {
       return this.op;
     }
 
-
     /** sets the concurrency versioning tag for this message */
     public void setVersionTag(VersionTag tag) {
       this.versionTag = tag;
@@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation {
     /**
      * process a reply
      * 
-     * @param reply
-     * @param processor
      * @return true if the reply-processor should continue to process this response
      */
     boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) {
@@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation {
      * @param event the entry event that contains the old value
      */
     public void appendOldValueToMessage(EntryEventImpl event) {
-      {
-        @Unretained
-        Object val = event.getRawOldValue();
-        if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
-            || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
-          return;
-        }
+      @Unretained
+      Object val = event.getRawOldValue();
+      if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
+          || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
+        return;
       }
       event.exportOldValue(this);
     }
@@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation {
 
     protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
       Assert.assertTrue(this.regionPath != null, "regionPath was null");
-      GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+      InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
       return gfc.getRegionByPathForProcessing(this.regionPath);
     }
 
@@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation {
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
         basicProcess(dm, lclRgn);
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
@@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation {
             // region
             if (!rgn.isEventTrackerInitialized()
                 && (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) {
-              if (logger.isDebugEnabled()) {
+              if (logger.isTraceEnabled()) {
                 logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event");
               }
               return;
@@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation {
           sendReply = operateOnRegion(event, dm) && sendReply;
         } finally {
           if (event instanceof EntryEventImpl) {
-            ((EntryEventImpl) event).release();
+            ((Releasable) event).release();
           }
         }
-      } catch (RegionDestroyedException e) {
+      } catch (RegionDestroyedException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Region destroyed: nothing to do", this);
         }
-      } catch (CancelException e) {
+      } catch (CancelException ignore) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Cancelled: nothing to do", this);
@@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation {
         if (!lclRgn.isDestroyed()) {
           logger.error("Got disk access exception, expected region to be destroyed", e);
         }
-      } catch (EntryNotFoundException e) {
+      } catch (EntryNotFoundException ignore) {
         this.appliedOperation = true;
         if (logger.isDebugEnabled()) {
           logger.debug("{} Entry not found, nothing to do", this);
@@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation {
       if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871
         // distributed-no-ack message. Don't respond
       } else {
-        ReplyException exception = rex;
-        ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false,
+        ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
             isInternal());
       }
     }
@@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation {
      * When an event is discarded because of an attempt to overwrite a more recent change we still
      * need to deliver that event to clients. Clients can then perform their own concurrency checks
      * on the event.
-     * 
-     * @param rgn
-     * @param ev
      */
     protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
       if (logger.isDebugEnabled()) {
@@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation {
       rgn.notifyBridgeClients(ev);
     }
 
-    // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys,
-    // String path) {
-    // return LocalRegion.getRegionFromPath(sys, path);
-    // }
-
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
         throws EntryNotFoundException;
 
@@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-      // super.fromData(in);
       short bits = in.readShort();
       short extBits = in.readShort();
       this.flags = bits;
@@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation {
 
     @Override
     public void toData(DataOutput out) throws IOException {
-      // super.toData(out);
-
       short bits = 0;
       short extendedBits = 0;
       bits = computeCompressedShort(bits);
@@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation {
   static class CacheOperationReplyProcessor extends DirectReplyProcessor {
     public CacheOperationMessage msg;
 
-    public CopyOnWriteHashSet<InternalDistributedMember> closedMembers =
-        new CopyOnWriteHashSet<InternalDistributedMember>();
+    public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
 
     public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
       super(system, initMembers);