You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/11 00:23:41 UTC
geode git commit: fix-3
Repository: geode
Updated Branches:
refs/heads/feature/GEM-1353 e7ba045c7 -> b59370051
fix-3
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b5937005
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b5937005
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b5937005
Branch: refs/heads/feature/GEM-1353
Commit: b59370051f64a50c6719b8e4f12af6cf5d4c6b67
Parents: e7ba045
Author: zhouxh <gz...@pivotal.io>
Authored: Mon Apr 10 17:21:25 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Mon Apr 10 17:21:25 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketRegion.java | 60 +++++++------------
.../cache/DistributedCacheOperation.java | 27 ++++++++-
.../cache/DistributedClearOperation.java | 19 +-----
.../geode/internal/cache/DistributedRegion.java | 61 +++-----------------
.../internal/cache/LocalRegionDataView.java | 12 ++--
.../wan/serial/SerialGatewaySenderQueue.java | 8 +--
.../DistributedAckRegionCCEDUnitTest.java | 8 +--
.../cache/query/cq/dunit/CqQueryDUnitTest.java | 8 +--
8 files changed, 67 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 70ef226..4e68520 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -577,7 +577,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) {
- long viewVersion = -1;
+ long token = -1;
UpdateOperation op = null;
try {
@@ -589,8 +589,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
} else {
// BR's put
op = new UpdateOperation(event, lastModified);
- viewVersion = op.startOperation();
- op.distribute();
+ token = op.startOperation();
if (logger.isDebugEnabled()) {
logger.debug("sent update operation : for region : {}: with event: {}", this.getName(),
event);
@@ -602,7 +601,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
} finally {
if (op != null) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
@@ -620,7 +619,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// distribution *before* we do basicPutPart2.
final long modifiedTime = event.getEventTime(lastModified);
- long viewVersion = -1;
+ long token = -1;
UpdateOperation op = null;
try {
@@ -649,8 +648,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
try {
// PR's put PR
op = new UpdateOperation(event, modifiedTime);
- viewVersion = op.startOperation();
- op.distribute();
+ token = op.startOperation();
} finally {
this.partitionedRegion.getPrStats().endSendReplication(start);
}
@@ -665,7 +663,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
return lastModifiedTime;
} finally {
if (op != null) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
@@ -911,20 +909,19 @@ public class BucketRegion extends DistributedRegion implements Bucket {
protected void distributeInvalidateOperation(EntryEventImpl event) {
InvalidateOperation op = null;
- long viewVersion = -1;
+ long token = -1;
try {
if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
// This cache has processed the event, forward operation
// and event messages to backup buckets
// BR.invalidate hasSeenEvent
op = new InvalidateOperation(event);
- viewVersion = op.startOperation();
- op.distribute();
+ token = op.startOperation();
}
event.invokeCallbacks(this, true, false);
} finally {
if (op != null) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
@@ -933,7 +930,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, boolean clearConflict,
boolean invokeCallbacks) {
// Assumed this is called with the entry synchronized
- long viewVersion = -1;
+ long token = -1;
InvalidateOperation op = null;
try {
@@ -955,14 +952,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// distribute op to bucket secondaries and event to other listeners
// BR's invalidate
op = new InvalidateOperation(event);
- viewVersion = op.startOperation();
- op.distribute();
+ token = op.startOperation();
}
super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
invokeCallbacks);
} finally {
if (op != null) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
@@ -1180,7 +1176,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeDestroyOperation(EntryEventImpl event) {
- long viewVersion = -1;
+ long token = -1;
DestroyOperation op = null;
try {
@@ -1198,8 +1194,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// BR's destroy, not to trigger callback here
event.setOldValueFromRegion();
op = new DestroyOperation(event);
- viewVersion = op.startOperation();
- op.distribute();
+ token = op.startOperation();
}
}
@@ -1208,14 +1203,14 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
} finally {
if (op != null) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
@Override
protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
- long viewVersion = -1;
+ long token = -1;
DestroyOperation op = null;
try {
// Assumed this is called with entry synchrony
@@ -1237,13 +1232,12 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// This code assumes that this bucket is primary
// BR.destroy for retain
op = new DestroyOperation(event);
- viewVersion = op.startOperation();
- op.distribute();
+ token = op.startOperation();
}
super.basicDestroyBeforeRemoval(entry, event);
} finally {
if (op != null) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
@@ -1333,14 +1327,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeUpdateEntryVersionOperation(EntryEventImpl event) {
- UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
- long viewVersion = -1;
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ new UpdateEntryVersionOperation(event).distribute();
}
public int getRedundancyLevel() {
@@ -1575,14 +1562,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
// Send out the destroy op to peers
- DestroyRegionOperation dro = new DestroyRegionOperation(event, true);
- long viewVersion = -1;
- try {
- viewVersion = dro.startOperation();
- dro.distribute();
- } finally {
- dro.endOperation(viewVersion);
- }
+ new DestroyRegionOperation(event, true).distribute();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index b77c80c..86063a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -240,6 +240,12 @@ public abstract class DistributedCacheOperation {
return true;
}
+ /**
+ * region's distribution advisor marked that a distribution is about to start, then distribute. It
+ * returns a token, which is view version. Return -1 means the method did not succeed. This method
+ * must be invoked before toDistribute(). This method should pair with endOperation() in
+ * try/finally block.
+ */
public long startOperation() {
DistributedRegion region = getRegion();
long viewVersion = -1;
@@ -250,9 +256,14 @@ public abstract class DistributedCacheOperation {
logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
viewVersion);
}
+ _distribute();
return viewVersion;
}
+ /**
+ * region's distribution advisor marked that a distribution is ended. This method should pair with
+ * startOperation in try/finally block.
+ */
public void endOperation(long viewVersion) {
DistributedRegion region = getRegion();
if (viewVersion != -1) {
@@ -269,8 +280,22 @@ public abstract class DistributedCacheOperation {
* who the recipients are and handles careful delivery of the operation to those members.
*/
public void distribute() {
+ long token = -1;
+ try {
+ token = startOperation();
+ } finally {
+ endOperation(token);
+ }
+ }
+
+ /**
+ * About to distribute a cache operation to other members of the distributed system. This method
+ * determines who the recipients are and handles careful delivery of the operation to those
+ * members. This method should wrapped by startOperation() and endOperation() in try/finally
+ * block.
+ */
+ private void _distribute() {
DistributedRegion region = getRegion();
- // logger.info("GGG:" + region);
DM mgr = region.getDistributionManager();
boolean reliableOp = isOperationReliable() && region.requiresReliabilityCheck();
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index e209d77..9d10fc1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -67,15 +67,8 @@ public class DistributedClearOperation extends DistributedCacheOperation {
**/
public static void clear(RegionEventImpl regionEvent, RegionVersionVector rvv,
Set<InternalDistributedMember> recipients) {
- long viewVersion = -1;
- DistributedClearOperation op = new DistributedClearOperation(
- DistributedClearOperation.OperationType.OP_CLEAR, regionEvent, rvv, recipients);
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ new DistributedClearOperation(DistributedClearOperation.OperationType.OP_CLEAR, regionEvent,
+ rvv, recipients).distribute();
}
/**
@@ -88,13 +81,7 @@ public class DistributedClearOperation extends DistributedCacheOperation {
Set<InternalDistributedMember> recipients) {
DistributedClearOperation dco = new DistributedClearOperation(
DistributedClearOperation.OperationType.OP_LOCK_FOR_CLEAR, regionEvent, null, recipients);
- long viewVersion = -1;
- try {
- viewVersion = dco.startOperation();
- dco.distribute();
- } finally {
- dco.endOperation(viewVersion);
- }
+ dco.distribute();
}
@Override
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index affcfa7..ed1a2fe 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -439,16 +439,11 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (distribute) {
// DR's put, it has notified gateway sender earlier
UpdateOperation op = new UpdateOperation(event, lastModified);
- long viewVersion = op.startOperation();
if (logger.isTraceEnabled()) {
logger.trace("distributing operation for event : {} : for region : {}", event,
this.getName());
}
- try {
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ op.distribute();
}
}
}
@@ -1684,14 +1679,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
// DR.destroy, it has notifiedGatewaySender ealier
- long viewVersion = -1;
DestroyOperation op = new DestroyOperation(event);
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ op.distribute();
}
}
}
@@ -1746,14 +1735,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* @since GemFire 5.7
*/
protected void distributeInvalidateRegion(RegionEventImpl event) {
- InvalidateRegionOperation op = new InvalidateRegionOperation(event);
- long viewVersion = -1;
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ new InvalidateRegionOperation(event).distribute();
}
/**
@@ -1802,14 +1784,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (persistenceAdvisor != null) {
persistenceAdvisor.releaseTieLock();
}
- long viewVersion = -1;
- DestroyRegionOperation op = new DestroyRegionOperation(event, notifyOfRegionDeparture);
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
}
/**
@@ -1887,14 +1862,8 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
// DR.invalidate, it has triggered callback earlier
- long viewVersion = -1;
InvalidateOperation op = new InvalidateOperation(event);
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ op.distribute();
}
}
}
@@ -1927,13 +1896,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (event.isDistributed() && !event.isOriginRemote()) {
// DR has sent callback earlier
UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
- long viewVersion = -1;
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ op.distribute();
}
}
}
@@ -2138,13 +2101,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
this.getCachePerfStats().incTombstoneGCCount();
EventID eventId = new EventID(getSystem());
DistributedTombstoneOperation gc = DistributedTombstoneOperation.gc(this, eventId);
- long viewVersion = -1;
- try {
- viewVersion = gc.startOperation();
- gc.distribute();
- } finally {
- gc.endOperation(viewVersion);
- }
+ gc.distribute();
notifyClientsOfTombstoneGC(getVersionVector().getTombstoneGCVector(), keysRemoved, eventId,
null);
return eventId;
@@ -3393,7 +3350,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
public void postPutAllSend(DistributedPutAllOperation putAllOp,
VersionedObjectList successfulPuts) {
if (putAllOp.putAllDataSize > 0) {
- putAllOp.distribute();
+ putAllOp.startOperation();
} else {
if (logger.isDebugEnabled()) {
logger.debug("DR.postPutAll: no data to distribute");
@@ -3405,7 +3362,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
public void postRemoveAllSend(DistributedRemoveAllOperation op,
VersionedObjectList successfulOps) {
if (op.removeAllDataSize > 0) {
- op.distribute();
+ op.startOperation();
} else {
getCache().getLoggerI18n().fine("DR.postRemoveAll: no data to distribute");
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 3d7418f..6d415d5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -312,16 +312,16 @@ public class LocalRegionDataView implements InternalDataView {
putallOp.fillVersionedObjectList(successfulPuts);
}
// BR & DR's putAll
- long viewVersion = -1;
+ long token = -1;
try {
if (region instanceof DistributedRegion) {
- viewVersion = putallOp.startOperation();
+ token = putallOp.startOperation();
}
region.postPutAllSend(putallOp, successfulPuts);
region.postPutAllFireEvents(putallOp, successfulPuts);
} finally {
if (region instanceof DistributedRegion) {
- putallOp.endOperation(viewVersion);
+ putallOp.endOperation(token);
}
}
}
@@ -337,16 +337,16 @@ public class LocalRegionDataView implements InternalDataView {
op.fillVersionedObjectList(successfulOps);
}
// BR, DR's removeAll
- long viewVersion = -1;
+ long token = -1;
try {
if (region instanceof DistributedRegion) {
- viewVersion = op.startOperation();
+ token = op.startOperation();
}
region.postRemoveAllSend(op, successfulOps);
region.postRemoveAllFireEvents(op, successfulOps);
} finally {
if (region instanceof DistributedRegion) {
- op.endOperation(viewVersion);
+ op.endOperation(token);
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index 435ad70..e6d54c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1142,13 +1142,7 @@ public class SerialGatewaySenderQueue implements RegionQueue {
event.setTailKey(temp);
BatchDestroyOperation op = new BatchDestroyOperation(event);
- long viewVersion = -1;
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ op.distribute();
if (logger.isDebugEnabled()) {
logger.debug("BatchRemovalThread completed destroy of keys from {} to {}",
lastDestroyedKey, temp);
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index ff37e36..dcb6cf3 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -254,13 +254,7 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
// this should update the controller's cache with the updated value but leave this cache
// alone
DistributedCacheOperation op = new UpdateOperation(event, tag.getVersionTimeStamp());
- long viewVersion = -1;
- try {
- viewVersion = op.startOperation();
- op.distribute();
- } finally {
- op.endOperation(viewVersion);
- }
+ op.distribute();
event.release();
}
});
http://git-wip-us.apache.org/repos/asf/geode/blob/b5937005/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
index 558df48..e320ff1 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
@@ -1577,13 +1577,7 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
Region subregion = getCache().getRegion("root/" + regionName);
DistributedTombstoneOperation gc = DistributedTombstoneOperation
.gc((DistributedRegion) subregion, new EventID(getCache().getDistributedSystem()));
- long viewVersion = -1;
- try {
- viewVersion = gc.startOperation();
- gc.distribute();
- } finally {
- gc.endOperation(viewVersion);
- }
+ gc.distribute();
}
};
server.invoke(task);