You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/11 00:23:41 UTC

geode git commit: fix-3

Repository: geode
Updated Branches:
  refs/heads/feature/GEM-1353 e7ba045c7 -> b59370051


fix-3


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b5937005
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b5937005
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b5937005

Branch: refs/heads/feature/GEM-1353
Commit: b59370051f64a50c6719b8e4f12af6cf5d4c6b67
Parents: e7ba045
Author: zhouxh <gz...@pivotal.io>
Authored: Mon Apr 10 17:21:25 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Mon Apr 10 17:21:25 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegion.java      | 60 +++++++------------
 .../cache/DistributedCacheOperation.java        | 27 ++++++++-
 .../cache/DistributedClearOperation.java        | 19 +-----
 .../geode/internal/cache/DistributedRegion.java | 61 +++-----------------
 .../internal/cache/LocalRegionDataView.java     | 12 ++--
 .../wan/serial/SerialGatewaySenderQueue.java    |  8 +--
 .../DistributedAckRegionCCEDUnitTest.java       |  8 +--
 .../cache/query/cq/dunit/CqQueryDUnitTest.java  |  8 +--
 8 files changed, 67 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 70ef226..4e68520 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -577,7 +577,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) {
-    long viewVersion = -1;
+    long token = -1;
     UpdateOperation op = null;
 
     try {
@@ -589,8 +589,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
         } else {
           // BR's put
           op = new UpdateOperation(event, lastModified);
-          viewVersion = op.startOperation();
-          op.distribute();
+          token = op.startOperation();
           if (logger.isDebugEnabled()) {
             logger.debug("sent update operation : for region  : {}: with event: {}", this.getName(),
                 event);
@@ -602,7 +601,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       }
     } finally {
       if (op != null) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }
@@ -620,7 +619,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     // distribution *before* we do basicPutPart2.
     final long modifiedTime = event.getEventTime(lastModified);
 
-    long viewVersion = -1;
+    long token = -1;
     UpdateOperation op = null;
 
     try {
@@ -649,8 +648,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
           try {
             // PR's put PR
             op = new UpdateOperation(event, modifiedTime);
-            viewVersion = op.startOperation();
-            op.distribute();
+            token = op.startOperation();
           } finally {
             this.partitionedRegion.getPrStats().endSendReplication(start);
           }
@@ -665,7 +663,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       return lastModifiedTime;
     } finally {
       if (op != null) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }
@@ -911,20 +909,19 @@ public class BucketRegion extends DistributedRegion implements Bucket {
 
   protected void distributeInvalidateOperation(EntryEventImpl event) {
     InvalidateOperation op = null;
-    long viewVersion = -1;
+    long token = -1;
     try {
       if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
         // This cache has processed the event, forward operation
         // and event messages to backup buckets
         // BR.invalidate hasSeenEvent
         op = new InvalidateOperation(event);
-        viewVersion = op.startOperation();
-        op.distribute();
+        token = op.startOperation();
       }
       event.invokeCallbacks(this, true, false);
     } finally {
       if (op != null) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }
@@ -933,7 +930,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, boolean clearConflict,
       boolean invokeCallbacks) {
     // Assumed this is called with the entry synchronized
-    long viewVersion = -1;
+    long token = -1;
     InvalidateOperation op = null;
 
     try {
@@ -955,14 +952,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
         // distribute op to bucket secondaries and event to other listeners
         // BR's invalidate
         op = new InvalidateOperation(event);
-        viewVersion = op.startOperation();
-        op.distribute();
+        token = op.startOperation();
       }
       super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
           invokeCallbacks);
     } finally {
       if (op != null) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }
@@ -1180,7 +1176,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeDestroyOperation(EntryEventImpl event) {
-    long viewVersion = -1;
+    long token = -1;
     DestroyOperation op = null;
 
     try {
@@ -1198,8 +1194,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
           // BR's destroy, not to trigger callback here
           event.setOldValueFromRegion();
           op = new DestroyOperation(event);
-          viewVersion = op.startOperation();
-          op.distribute();
+          token = op.startOperation();
         }
       }
 
@@ -1208,14 +1203,14 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       }
     } finally {
       if (op != null) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }
 
   @Override
   protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
-    long viewVersion = -1;
+    long token = -1;
     DestroyOperation op = null;
     try {
       // Assumed this is called with entry synchrony
@@ -1237,13 +1232,12 @@ public class BucketRegion extends DistributedRegion implements Bucket {
         // This code assumes that this bucket is primary
         // BR.destroy for retain
         op = new DestroyOperation(event);
-        viewVersion = op.startOperation();
-        op.distribute();
+        token = op.startOperation();
       }
       super.basicDestroyBeforeRemoval(entry, event);
     } finally {
       if (op != null) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }
@@ -1333,14 +1327,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeUpdateEntryVersionOperation(EntryEventImpl event) {
-    UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
-    long viewVersion = -1;
-    try {
-      viewVersion = op.startOperation();
-      op.distribute();
-    } finally {
-      op.endOperation(viewVersion);
-    }
+    new UpdateEntryVersionOperation(event).distribute();
   }
 
   public int getRedundancyLevel() {
@@ -1575,14 +1562,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       }
 
       // Send out the destroy op to peers
-      DestroyRegionOperation dro = new DestroyRegionOperation(event, true);
-      long viewVersion = -1;
-      try {
-        viewVersion = dro.startOperation();
-        dro.distribute();
-      } finally {
-        dro.endOperation(viewVersion);
-      }
+      new DestroyRegionOperation(event, true).distribute();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index b77c80c..86063a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -240,6 +240,12 @@ public abstract class DistributedCacheOperation {
     return true;
   }
 
+  /**
+   * region's distribution advisor marked that a distribution is about to start, then distribute. It
+   * returns a token, which is view version. Return -1 means the method did not succeed. This method
+   * must be invoked before toDistribute(). This method should pair with endOperation() in
+   * try/finally block.
+   */
   public long startOperation() {
     DistributedRegion region = getRegion();
     long viewVersion = -1;
@@ -250,9 +256,14 @@ public abstract class DistributedCacheOperation {
       logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
           viewVersion);
     }
+    _distribute();
     return viewVersion;
   }
 
+  /**
+   * region's distribution advisor marked that a distribution is ended. This method should pair with
+   * startOperation in try/finally block.
+   */
   public void endOperation(long viewVersion) {
     DistributedRegion region = getRegion();
     if (viewVersion != -1) {
@@ -269,8 +280,22 @@ public abstract class DistributedCacheOperation {
    * who the recipients are and handles careful delivery of the operation to those members.
    */
   public void distribute() {
+    long token = -1;
+    try {
+      token = startOperation();
+    } finally {
+      endOperation(token);
+    }
+  }
+
+  /**
+   * About to distribute a cache operation to other members of the distributed system. This method
+   * determines who the recipients are and handles careful delivery of the operation to those
+   * members. This method should wrapped by startOperation() and endOperation() in try/finally
+   * block.
+   */
+  private void _distribute() {
     DistributedRegion region = getRegion();
-    // logger.info("GGG:" + region);
     DM mgr = region.getDistributionManager();
     boolean reliableOp = isOperationReliable() && region.requiresReliabilityCheck();
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index e209d77..9d10fc1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -67,15 +67,8 @@ public class DistributedClearOperation extends DistributedCacheOperation {
    **/
   public static void clear(RegionEventImpl regionEvent, RegionVersionVector rvv,
       Set<InternalDistributedMember> recipients) {
-    long viewVersion = -1;
-    DistributedClearOperation op = new DistributedClearOperation(
-        DistributedClearOperation.OperationType.OP_CLEAR, regionEvent, rvv, recipients);
-    try {
-      viewVersion = op.startOperation();
-      op.distribute();
-    } finally {
-      op.endOperation(viewVersion);
-    }
+    new DistributedClearOperation(DistributedClearOperation.OperationType.OP_CLEAR, regionEvent,
+        rvv, recipients).distribute();
   }
 
   /**
@@ -88,13 +81,7 @@ public class DistributedClearOperation extends DistributedCacheOperation {
       Set<InternalDistributedMember> recipients) {
     DistributedClearOperation dco = new DistributedClearOperation(
         DistributedClearOperation.OperationType.OP_LOCK_FOR_CLEAR, regionEvent, null, recipients);
-    long viewVersion = -1;
-    try {
-      viewVersion = dco.startOperation();
-      dco.distribute();
-    } finally {
-      dco.endOperation(viewVersion);
-    }
+    dco.distribute();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index affcfa7..ed1a2fe 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -439,16 +439,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
       if (distribute) {
         // DR's put, it has notified gateway sender earlier
         UpdateOperation op = new UpdateOperation(event, lastModified);
-        long viewVersion = op.startOperation();
         if (logger.isTraceEnabled()) {
           logger.trace("distributing operation for event : {} : for region : {}", event,
               this.getName());
         }
-        try {
-          op.distribute();
-        } finally {
-          op.endOperation(viewVersion);
-        }
+        op.distribute();
       }
     }
   }
@@ -1684,14 +1679,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
       boolean distribute = !event.getInhibitDistribution();
       if (distribute) {
         // DR.destroy, it has notifiedGatewaySender ealier
-        long viewVersion = -1;
         DestroyOperation op = new DestroyOperation(event);
-        try {
-          viewVersion = op.startOperation();
-          op.distribute();
-        } finally {
-          op.endOperation(viewVersion);
-        }
+        op.distribute();
       }
     }
   }
@@ -1746,14 +1735,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
    * @since GemFire 5.7
    */
   protected void distributeInvalidateRegion(RegionEventImpl event) {
-    InvalidateRegionOperation op = new InvalidateRegionOperation(event);
-    long viewVersion = -1;
-    try {
-      viewVersion = op.startOperation();
-      op.distribute();
-    } finally {
-      op.endOperation(viewVersion);
-    }
+    new InvalidateRegionOperation(event).distribute();
   }
 
   /**
@@ -1802,14 +1784,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     if (persistenceAdvisor != null) {
       persistenceAdvisor.releaseTieLock();
     }
-    long viewVersion = -1;
-    DestroyRegionOperation op = new DestroyRegionOperation(event, notifyOfRegionDeparture);
-    try {
-      viewVersion = op.startOperation();
-      op.distribute();
-    } finally {
-      op.endOperation(viewVersion);
-    }
+    new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
   }
 
   /**
@@ -1887,14 +1862,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
         boolean distribute = !event.getInhibitDistribution();
         if (distribute) {
           // DR.invalidate, it has triggered callback earlier
-          long viewVersion = -1;
           InvalidateOperation op = new InvalidateOperation(event);
-          try {
-            viewVersion = op.startOperation();
-            op.distribute();
-          } finally {
-            op.endOperation(viewVersion);
-          }
+          op.distribute();
         }
       }
     }
@@ -1927,13 +1896,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
       if (event.isDistributed() && !event.isOriginRemote()) {
         // DR has sent callback earlier
         UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
-        long viewVersion = -1;
-        try {
-          viewVersion = op.startOperation();
-          op.distribute();
-        } finally {
-          op.endOperation(viewVersion);
-        }
+        op.distribute();
       }
     }
   }
@@ -2138,13 +2101,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     this.getCachePerfStats().incTombstoneGCCount();
     EventID eventId = new EventID(getSystem());
     DistributedTombstoneOperation gc = DistributedTombstoneOperation.gc(this, eventId);
-    long viewVersion = -1;
-    try {
-      viewVersion = gc.startOperation();
-      gc.distribute();
-    } finally {
-      gc.endOperation(viewVersion);
-    }
+    gc.distribute();
     notifyClientsOfTombstoneGC(getVersionVector().getTombstoneGCVector(), keysRemoved, eventId,
         null);
     return eventId;
@@ -3393,7 +3350,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
   public void postPutAllSend(DistributedPutAllOperation putAllOp,
       VersionedObjectList successfulPuts) {
     if (putAllOp.putAllDataSize > 0) {
-      putAllOp.distribute();
+      putAllOp.startOperation();
     } else {
       if (logger.isDebugEnabled()) {
         logger.debug("DR.postPutAll: no data to distribute");
@@ -3405,7 +3362,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
   public void postRemoveAllSend(DistributedRemoveAllOperation op,
       VersionedObjectList successfulOps) {
     if (op.removeAllDataSize > 0) {
-      op.distribute();
+      op.startOperation();
     } else {
       getCache().getLoggerI18n().fine("DR.postRemoveAll: no data to distribute");
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 3d7418f..6d415d5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -312,16 +312,16 @@ public class LocalRegionDataView implements InternalDataView {
       putallOp.fillVersionedObjectList(successfulPuts);
     }
     // BR & DR's putAll
-    long viewVersion = -1;
+    long token = -1;
     try {
       if (region instanceof DistributedRegion) {
-        viewVersion = putallOp.startOperation();
+        token = putallOp.startOperation();
       }
       region.postPutAllSend(putallOp, successfulPuts);
       region.postPutAllFireEvents(putallOp, successfulPuts);
     } finally {
       if (region instanceof DistributedRegion) {
-        putallOp.endOperation(viewVersion);
+        putallOp.endOperation(token);
       }
     }
   }
@@ -337,16 +337,16 @@ public class LocalRegionDataView implements InternalDataView {
       op.fillVersionedObjectList(successfulOps);
     }
     // BR, DR's removeAll
-    long viewVersion = -1;
+    long token = -1;
     try {
       if (region instanceof DistributedRegion) {
-        viewVersion = op.startOperation();
+        token = op.startOperation();
       }
       region.postRemoveAllSend(op, successfulOps);
       region.postRemoveAllFireEvents(op, successfulOps);
     } finally {
       if (region instanceof DistributedRegion) {
-        op.endOperation(viewVersion);
+        op.endOperation(token);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 435ad70..e6d54c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1142,13 +1142,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
             event.setTailKey(temp);
 
             BatchDestroyOperation op = new BatchDestroyOperation(event);
-            long viewVersion = -1;
-            try {
-              viewVersion = op.startOperation();
-              op.distribute();
-            } finally {
-              op.endOperation(viewVersion);
-            }
+            op.distribute();
             if (logger.isDebugEnabled()) {
               logger.debug("BatchRemovalThread completed destroy of keys from {} to {}",
                   lastDestroyedKey, temp);

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index ff37e36..dcb6cf3 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -254,13 +254,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
         // this should update the controller's cache with the updated value but leave this cache
         // alone
         DistributedCacheOperation op = new UpdateOperation(event, tag.getVersionTimeStamp());
-        long viewVersion = -1;
-        try {
-          viewVersion = op.startOperation();
-          op.distribute();
-        } finally {
-          op.endOperation(viewVersion);
-        }
+        op.distribute();
         event.release();
       }
     });

http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
index 558df48..e320ff1 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
@@ -1577,13 +1577,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
         Region subregion = getCache().getRegion("root/" + regionName);
         DistributedTombstoneOperation gc = DistributedTombstoneOperation
             .gc((DistributedRegion) subregion, new EventID(getCache().getDistributedSystem()));
-        long viewVersion = -1;
-        try {
-          viewVersion = gc.startOperation();
-          gc.distribute();
-        } finally {
-          gc.endOperation(viewVersion);
-        }
+        gc.distribute();
       }
     };
     server.invoke(task);