You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/09 06:02:44 UTC
geode git commit: fix-1
Repository: geode
Updated Branches:
refs/heads/feature/GEM-1353 b9e8ac598 -> 72bc82e02
fix-1
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/72bc82e0
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/72bc82e0
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/72bc82e0
Branch: refs/heads/feature/GEM-1353
Commit: 72bc82e02f4ffb475903fc2b528f0f3dc37c1f9a
Parents: b9e8ac5
Author: zhouxh <gz...@pivotal.io>
Authored: Sat Apr 8 23:02:32 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Sat Apr 8 23:02:32 2017 -0700
----------------------------------------------------------------------
.../internal/DistributionAdvisor.java | 7 ++++--
.../geode/internal/cache/BucketRegion.java | 12 ++++++++--
.../cache/DistributedCacheOperation.java | 14 ++++++++++--
.../cache/DistributedClearOperation.java | 19 +++++++++++++---
.../geode/internal/cache/DistributedRegion.java | 23 +++++++++++++++-----
.../wan/serial/SerialGatewaySenderQueue.java | 8 ++++++-
.../DistributedAckRegionCCEDUnitTest.java | 8 ++++++-
.../cache/query/cq/dunit/CqQueryDUnitTest.java | 8 ++++++-
8 files changed, 81 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 1d3dc86..903f10d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -159,13 +159,13 @@ public class DistributionAdvisor {
* the number of operations in-progress for previous versions of the profile set. Guarded by
* opCountLock
*/
- private long previousVersionOpCount;
+ public long previousVersionOpCount;
/**
* the number of operations in-progress for the current version of the profile set. Guarded by
* opCountLock
*/
- private long currentVersionOpCount;
+ public long currentVersionOpCount;
/**
* Hold onto removed profiles to compare to late-processed profiles. Fix for bug 36881. Protected
@@ -740,6 +740,7 @@ public class DistributionAdvisor {
}
synchronized (this.opCountLock) {
currentVersionOpCount++;
+ // logger.info("GGG:startOp:"+this.currentVersionOpCount, new Exception());
if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
logger.trace(LogMarker.STATE_FLUSH_OP, "StateFlush current opcount incremented: {}",
currentVersionOpCount);
@@ -759,12 +760,14 @@ public class DistributionAdvisor {
synchronized (this.opCountLock) {
if (version == membershipVersion) {
currentVersionOpCount--;
+ // logger.info("GGG:endOp:"+this.currentVersionOpCount, new Exception());
if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
logger.trace(LogMarker.STATE_FLUSH_OP, "StateFlush current opcount deccremented: {}",
currentVersionOpCount);
}
} else {
previousVersionOpCount--;
+ // logger.info("GGG:endOp2:" + this.previousVersionOpCount, new Exception());
if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
logger.trace(LogMarker.STATE_FLUSH_OP, "StateFlush previous opcount incremented: {}",
previousVersionOpCount);
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 03aa3ef..70ef226 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -1334,8 +1334,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
protected void distributeUpdateEntryVersionOperation(EntryEventImpl event) {
UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
- long viewVersion = op.startOperation();
+ long viewVersion = -1;
try {
+ viewVersion = op.startOperation();
op.distribute();
} finally {
op.endOperation(viewVersion);
@@ -1574,7 +1575,14 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
// Send out the destroy op to peers
- new DestroyRegionOperation(event, true).distribute();
+ DestroyRegionOperation dro = new DestroyRegionOperation(event, true);
+ long viewVersion = -1;
+ try {
+ viewVersion = dro.startOperation();
+ dro.distribute();
+ } finally {
+ dro.endOperation(viewVersion);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index a050a2b..00912d4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -250,18 +250,27 @@ public abstract class DistributedCacheOperation {
logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
viewVersion);
}
+ // logger.info("GGG:startOp:" + viewVersion + ":" + region.getFullPath() + ":"
+ // + region.getDistributionAdvisor().currentVersionOpCount + ":"
+ // + region.getDistributionAdvisor().previousVersionOpCount, new Exception());
return viewVersion;
}
public void endOperation(long viewVersion) {
DistributedRegion region = getRegion();
- if (viewVersion > 0) {
+ if (viewVersion != -1) {
region.getDistributionAdvisor().endOperation(viewVersion);
- viewVersion = -1;
if (logger.isDebugEnabled()) {
logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
viewVersion);
}
+ // logger.info("GGG:endOp:" + viewVersion + ":" + region.getFullPath() + ":"
+ // + region.getDistributionAdvisor().currentVersionOpCount + ":"
+ // + region.getDistributionAdvisor().previousVersionOpCount, new Exception());
+ } else {
+ // logger.info("GGG:ENDOP:" + viewVersion + ":" + region.getFullPath()
+ // + region.getDistributionAdvisor().currentVersionOpCount + ":"
+ // + region.getDistributionAdvisor().previousVersionOpCount, new Exception());
}
}
@@ -271,6 +280,7 @@ public abstract class DistributedCacheOperation {
*/
public void distribute() {
DistributedRegion region = getRegion();
+ // logger.info("GGG:" + region);
DM mgr = region.getDistributionManager();
boolean reliableOp = isOperationReliable() && region.requiresReliabilityCheck();
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
index 9d10fc1..e209d77 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedClearOperation.java
@@ -67,8 +67,15 @@ public class DistributedClearOperation extends DistributedCacheOperation {
**/
public static void clear(RegionEventImpl regionEvent, RegionVersionVector rvv,
Set<InternalDistributedMember> recipients) {
- new DistributedClearOperation(DistributedClearOperation.OperationType.OP_CLEAR, regionEvent,
- rvv, recipients).distribute();
+ long viewVersion = -1;
+ DistributedClearOperation op = new DistributedClearOperation(
+ DistributedClearOperation.OperationType.OP_CLEAR, regionEvent, rvv, recipients);
+ try {
+ viewVersion = op.startOperation();
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
/**
@@ -81,7 +88,13 @@ public class DistributedClearOperation extends DistributedCacheOperation {
Set<InternalDistributedMember> recipients) {
DistributedClearOperation dco = new DistributedClearOperation(
DistributedClearOperation.OperationType.OP_LOCK_FOR_CLEAR, regionEvent, null, recipients);
- dco.distribute();
+ long viewVersion = -1;
+ try {
+ viewVersion = dco.startOperation();
+ dco.distribute();
+ } finally {
+ dco.endOperation(viewVersion);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index bd9cdd9..affcfa7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -1684,9 +1684,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
// DR.destroy, it has notifiedGatewaySender ealier
+ long viewVersion = -1;
DestroyOperation op = new DestroyOperation(event);
- long viewVersion = op.startOperation();
try {
+ viewVersion = op.startOperation();
op.distribute();
} finally {
op.endOperation(viewVersion);
@@ -1746,8 +1747,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
*/
protected void distributeInvalidateRegion(RegionEventImpl event) {
InvalidateRegionOperation op = new InvalidateRegionOperation(event);
- long viewVersion = op.startOperation();
+ long viewVersion = -1;
try {
+ viewVersion = op.startOperation();
op.distribute();
} finally {
op.endOperation(viewVersion);
@@ -1800,9 +1802,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (persistenceAdvisor != null) {
persistenceAdvisor.releaseTieLock();
}
+ long viewVersion = -1;
DestroyRegionOperation op = new DestroyRegionOperation(event, notifyOfRegionDeparture);
- long viewVersion = op.startOperation();
try {
+ viewVersion = op.startOperation();
op.distribute();
} finally {
op.endOperation(viewVersion);
@@ -1884,9 +1887,10 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
// DR.invalidate, it has triggered callback earlier
+ long viewVersion = -1;
InvalidateOperation op = new InvalidateOperation(event);
- long viewVersion = op.startOperation();
try {
+ viewVersion = op.startOperation();
op.distribute();
} finally {
op.endOperation(viewVersion);
@@ -1923,8 +1927,9 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (event.isDistributed() && !event.isOriginRemote()) {
// DR has sent callback earlier
UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
- long viewVersion = op.startOperation();
+ long viewVersion = -1;
try {
+ viewVersion = op.startOperation();
op.distribute();
} finally {
op.endOperation(viewVersion);
@@ -2133,7 +2138,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
this.getCachePerfStats().incTombstoneGCCount();
EventID eventId = new EventID(getSystem());
DistributedTombstoneOperation gc = DistributedTombstoneOperation.gc(this, eventId);
- gc.distribute();
+ long viewVersion = -1;
+ try {
+ viewVersion = gc.startOperation();
+ gc.distribute();
+ } finally {
+ gc.endOperation(viewVersion);
+ }
notifyClientsOfTombstoneGC(getVersionVector().getTombstoneGCVector(), keysRemoved, eventId,
null);
return eventId;
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
index e6d54c5..435ad70 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java
@@ -1142,7 +1142,13 @@ public class SerialGatewaySenderQueue implements RegionQueue {
event.setTailKey(temp);
BatchDestroyOperation op = new BatchDestroyOperation(event);
- op.distribute();
+ long viewVersion = -1;
+ try {
+ viewVersion = op.startOperation();
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
if (logger.isDebugEnabled()) {
logger.debug("BatchRemovalThread completed destroy of keys from {} to {}",
lastDestroyedKey, temp);
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index dcb6cf3..ff37e36 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -254,7 +254,13 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
// this should update the controller's cache with the updated value but leave this cache
// alone
DistributedCacheOperation op = new UpdateOperation(event, tag.getVersionTimeStamp());
- op.distribute();
+ long viewVersion = -1;
+ try {
+ viewVersion = op.startOperation();
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
event.release();
}
});
http://git-wip-us.apache.org/repos/asf/geode/blob/72bc82e0/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
index e320ff1..558df48 100644
--- a/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
+++ b/geode-cq/src/test/java/org/apache/geode/cache/query/cq/dunit/CqQueryDUnitTest.java
@@ -1577,7 +1577,13 @@ public class CqQueryDUnitTest extends JUnit4CacheTestCase {
Region subregion = getCache().getRegion("root/" + regionName);
DistributedTombstoneOperation gc = DistributedTombstoneOperation
.gc((DistributedRegion) subregion, new EventID(getCache().getDistributedSystem()));
- gc.distribute();
+ long viewVersion = -1;
+ try {
+ viewVersion = gc.startOperation();
+ gc.distribute();
+ } finally {
+ gc.endOperation(viewVersion);
+ }
}
};
server.invoke(task);