You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/08 01:44:05 UTC
geode git commit: GEM-1353
Repository: geode
Updated Branches:
refs/heads/feature/GEM-1353 799548ee4 -> b9e8ac598
GEM-1353
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b9e8ac59
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b9e8ac59
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b9e8ac59
Branch: refs/heads/feature/GEM-1353
Commit: b9e8ac59892aed7f2ddea62f54b07ddac688456e
Parents: 799548e
Author: zhouxh <gz...@pivotal.io>
Authored: Fri Apr 7 18:43:43 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Fri Apr 7 18:43:43 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/BucketRegion.java | 272 ++++++++++++-------
.../cache/DistributedCacheOperation.java | 46 ++--
.../geode/internal/cache/DistributedRegion.java | 49 +++-
.../internal/cache/LocalRegionDataView.java | 20 +-
4 files changed, 259 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d92ddab..03aa3ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -577,21 +577,33 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) {
- if (!event.isOriginRemote() && !event.isNetSearch() && getBucketAdvisor().isPrimary()) {
- if (event.isBulkOpInProgress()) {
- // consolidate the UpdateOperation for each entry into a PutAllMessage
- // since we did not call basicPutPart3(), so we have to explicitly addEntry here
- event.getPutAllOperation().addEntry(event, this.getId());
- } else {
- new UpdateOperation(event, lastModified).distribute();
- if (logger.isDebugEnabled()) {
- logger.debug("sent update operation : for region : {}: with event: {}", this.getName(),
- event);
+ long viewVersion = -1;
+ UpdateOperation op = null;
+
+ try {
+ if (!event.isOriginRemote() && !event.isNetSearch() && getBucketAdvisor().isPrimary()) {
+ if (event.isBulkOpInProgress()) {
+ // consolidate the UpdateOperation for each entry into a PutAllMessage
+ // since we did not call basicPutPart3(), so we have to explicitly addEntry here
+ event.getPutAllOperation().addEntry(event, this.getId());
+ } else {
+ // BR's put
+ op = new UpdateOperation(event, lastModified);
+ viewVersion = op.startOperation();
+ op.distribute();
+ if (logger.isDebugEnabled()) {
+ logger.debug("sent update operation : for region : {}: with event: {}", this.getName(),
+ event);
+ }
}
}
- }
- if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
- event.invokeCallbacks(this, true, true);
+ if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
+ event.invokeCallbacks(this, true, true);
+ }
+ } finally {
+ if (op != null) {
+ op.endOperation(viewVersion);
+ }
}
}
@@ -607,40 +619,55 @@ public class BucketRegion extends DistributedRegion implements Bucket {
// timestamp returned from basicPutPart2, but as a bucket we want to do
// distribution *before* we do basicPutPart2.
final long modifiedTime = event.getEventTime(lastModified);
- // Update the get stats if necessary.
- if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
- updateStatsForGet(entry, true);
- }
- if (!event.isOriginRemote()) {
- if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
- boolean eventHasDelta = event.getDeltaBytes() != null;
- VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
- if (v != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("generated version tag {} in region {}", v, this.getName());
+
+ long viewVersion = -1;
+ UpdateOperation op = null;
+
+ try {
+ // Update the get stats if necessary.
+ if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
+ updateStatsForGet(entry, true);
+ }
+ if (!event.isOriginRemote()) {
+ if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+ boolean eventHasDelta = event.getDeltaBytes() != null;
+ VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
+ if (v != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("generated version tag {} in region {}", v, this.getName());
+ }
}
}
- }
- // This code assumes it is safe ignore token mode (GII in progress)
- // because it assumes when the origin of the event is local,
- // the GII has completed and the region is initialized and open for local
- // ops
- if (!event.isBulkOpInProgress()) {
- long start = this.partitionedRegion.getPrStats().startSendReplication();
- try {
- UpdateOperation op = new UpdateOperation(event, modifiedTime);
- op.distribute();
- } finally {
- this.partitionedRegion.getPrStats().endSendReplication(start);
+ // This code assumes it is safe ignore token mode (GII in progress)
+ // because it assumes when the origin of the event is local,
+ // the GII has completed and the region is initialized and open for local
+ // ops
+
+ if (!event.isBulkOpInProgress()) {
+ long start = this.partitionedRegion.getPrStats().startSendReplication();
+ try {
+ // PR's put PR
+ op = new UpdateOperation(event, modifiedTime);
+ viewVersion = op.startOperation();
+ op.distribute();
+ } finally {
+ this.partitionedRegion.getPrStats().endSendReplication(start);
+ }
+ } else {
+ // consolidate the UpdateOperation for each entry into a PutAllMessage
+ // basicPutPart3 takes care of this
}
- } else {
- // consolidate the UpdateOperation for each entry into a PutAllMessage
- // basicPutPart3 takes care of this
}
- }
- return super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+ long lastModifiedTime =
+ super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+ return lastModifiedTime;
+ } finally {
+ if (op != null) {
+ op.endOperation(viewVersion);
+ }
+ }
}
protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
@@ -883,39 +910,61 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeInvalidateOperation(EntryEventImpl event) {
- if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
- // This cache has processed the event, forward operation
- // and event messages to backup buckets
- new InvalidateOperation(event).distribute();
+ InvalidateOperation op = null;
+ long viewVersion = -1;
+ try {
+ if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+ // This cache has processed the event, forward operation
+ // and event messages to backup buckets
+ // BR.invalidate hasSeenEvent
+ op = new InvalidateOperation(event);
+ viewVersion = op.startOperation();
+ op.distribute();
+ }
+ event.invokeCallbacks(this, true, false);
+ } finally {
+ if (op != null) {
+ op.endOperation(viewVersion);
+ }
}
- event.invokeCallbacks(this, true, false);
}
@Override
void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, boolean clearConflict,
boolean invokeCallbacks) {
// Assumed this is called with the entry synchronized
- if (!event.isOriginRemote()) {
- if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
- VersionTag v = re.generateVersionTag(null, false, this, event);
- if (logger.isDebugEnabled() && v != null) {
- logger.debug("generated version tag {} in region {}", v, this.getName());
- }
- event.setVersionTag(v);
- }
+ long viewVersion = -1;
+ InvalidateOperation op = null;
- // This code assumes it is safe ignore token mode (GII in progress)
- // because it assumes when the origin of the event is local,
- // the GII has completed and the region is initialized and open for local
- // ops
+ try {
+ if (!event.isOriginRemote()) {
+ if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+ VersionTag v = re.generateVersionTag(null, false, this, event);
+ if (logger.isDebugEnabled() && v != null) {
+ logger.debug("generated version tag {} in region {}", v, this.getName());
+ }
+ event.setVersionTag(v);
+ }
- // This code assumes that this bucket is primary
- // distribute op to bucket secondaries and event to other listeners
- InvalidateOperation op = new InvalidateOperation(event);
- op.distribute();
+ // This code assumes it is safe ignore token mode (GII in progress)
+ // because it assumes when the origin of the event is local,
+ // the GII has completed and the region is initialized and open for local
+ // ops
+
+ // This code assumes that this bucket is primary
+ // distribute op to bucket secondaries and event to other listeners
+ // BR's invalidate
+ op = new InvalidateOperation(event);
+ viewVersion = op.startOperation();
+ op.distribute();
+ }
+ super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
+ invokeCallbacks);
+ } finally {
+ if (op != null) {
+ op.endOperation(viewVersion);
+ }
}
- super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
- invokeCallbacks);
}
@Override
@@ -1131,49 +1180,72 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeDestroyOperation(EntryEventImpl event) {
- if (logger.isTraceEnabled(LogMarker.DM)) {
- logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}",
- event);
- }
- if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
- if (event.isBulkOpInProgress()) {
- // consolidate the DestroyOperation for each entry into a RemoveAllMessage
- event.getRemoveAllOperation().addEntry(event, this.getId());
- } else {
- // This cache has processed the event, forward operation
- // and event messages to backup buckets
- event.setOldValueFromRegion();
- new DestroyOperation(event).distribute();
+ long viewVersion = -1;
+ DestroyOperation op = null;
+
+ try {
+ if (logger.isTraceEnabled(LogMarker.DM)) {
+ logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}",
+ event);
+ }
+ if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+ if (event.isBulkOpInProgress()) {
+ // consolidate the DestroyOperation for each entry into a RemoveAllMessage
+ event.getRemoveAllOperation().addEntry(event, this.getId());
+ } else {
+ // This cache has processed the event, forward operation
+ // and event messages to backup buckets
+ // BR's destroy, not to trigger callback here
+ event.setOldValueFromRegion();
+ op = new DestroyOperation(event);
+ viewVersion = op.startOperation();
+ op.distribute();
+ }
}
- }
- if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
- event.invokeCallbacks(this, true, false);
+ if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
+ event.invokeCallbacks(this, true, false);
+ }
+ } finally {
+ if (op != null) {
+ op.endOperation(viewVersion);
+ }
}
}
@Override
protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
- // Assumed this is called with entry synchrony
- if (!event.isOriginRemote() && !event.isBulkOpInProgress() && !event.getOperation().isLocal()
- && !Operation.EVICT_DESTROY.equals(event.getOperation())
- && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
-
- if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
- VersionTag v = entry.generateVersionTag(null, false, this, event);
- if (logger.isDebugEnabled() && v != null) {
- logger.debug("generated version tag {} in region {}", v, this.getName());
+ long viewVersion = -1;
+ DestroyOperation op = null;
+ try {
+ // Assumed this is called with entry synchrony
+ if (!event.isOriginRemote() && !event.isBulkOpInProgress() && !event.getOperation().isLocal()
+ && !Operation.EVICT_DESTROY.equals(event.getOperation())
+ && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
+
+ if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+ VersionTag v = entry.generateVersionTag(null, false, this, event);
+ if (logger.isDebugEnabled() && v != null) {
+ logger.debug("generated version tag {} in region {}", v, this.getName());
+ }
}
- }
- // This code assumes it is safe ignore token mode (GII in progress)
- // because it assume when the origin of the event is local,
- // then GII has completed (the region has been completely initialized)
+ // This code assumes it is safe ignore token mode (GII in progress)
+ // because it assume when the origin of the event is local,
+ // then GII has completed (the region has been completely initialized)
- // This code assumes that this bucket is primary
- new DestroyOperation(event).distribute();
+ // This code assumes that this bucket is primary
+ // BR.destroy for retain
+ op = new DestroyOperation(event);
+ viewVersion = op.startOperation();
+ op.distribute();
+ }
+ super.basicDestroyBeforeRemoval(entry, event);
+ } finally {
+ if (op != null) {
+ op.endOperation(viewVersion);
+ }
}
- super.basicDestroyBeforeRemoval(entry, event);
}
@Override
@@ -1261,7 +1333,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
protected void distributeUpdateEntryVersionOperation(EntryEventImpl event) {
- new UpdateEntryVersionOperation(event).distribute();
+ UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
+ long viewVersion = op.startOperation();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
public int getRedundancyLevel() {
http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index bded899..a050a2b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -240,6 +240,31 @@ public abstract class DistributedCacheOperation {
return true;
}
+ public long startOperation() {
+ DistributedRegion region = getRegion();
+ long viewVersion = -1;
+ if (this.containsRegionContentChange()) {
+ viewVersion = region.getDistributionAdvisor().startOperation();
+ }
+ if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+ logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
+ viewVersion);
+ }
+ return viewVersion;
+ }
+
+ public void endOperation(long viewVersion) {
+ DistributedRegion region = getRegion();
+ if (viewVersion > 0) {
+ region.getDistributionAdvisor().endOperation(viewVersion);
+ viewVersion = -1;
+ if (logger.isDebugEnabled()) {
+ logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
+ viewVersion);
+ }
+ }
+ }
+
/**
* Distribute a cache operation to other members of the distributed system. This method determines
* who the recipients are and handles careful delivery of the operation to those members.
@@ -261,15 +286,6 @@ public abstract class DistributedCacheOperation {
boolean isPutAll = (this instanceof DistributedPutAllOperation);
boolean isRemoveAll = (this instanceof DistributedRemoveAllOperation);
- long viewVersion = -1;
- if (this.containsRegionContentChange()) {
- viewVersion = region.getDistributionAdvisor().startOperation();
- }
- if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
- logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
- viewVersion);
- }
-
try {
// Recipients with CacheOp
Set<InternalDistributedMember> recipients = getRecipients();
@@ -596,11 +612,6 @@ public abstract class DistributedCacheOperation {
}
}
- if (viewVersion > 0) {
- region.getDistributionAdvisor().endOperation(viewVersion);
- viewVersion = -1;
- }
-
/** compute local client routing before waiting for an ack only for a bucket */
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
@@ -639,13 +650,6 @@ public abstract class DistributedCacheOperation {
throw e;
} finally {
ReplyProcessor21.setShortSevereAlertProcessing(false);
- if (viewVersion != -1) {
- if (logger.isDebugEnabled()) {
- logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
- viewVersion);
- }
- region.getDistributionAdvisor().endOperation(viewVersion);
- }
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index fa02574..bd9cdd9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -437,12 +437,18 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
distribute = false;
}
if (distribute) {
+ // DR's put, it has notified gateway sender earlier
UpdateOperation op = new UpdateOperation(event, lastModified);
+ long viewVersion = op.startOperation();
if (logger.isTraceEnabled()) {
logger.trace("distributing operation for event : {} : for region : {}", event,
this.getName());
}
- op.distribute();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
}
}
@@ -1657,6 +1663,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
event.getRemoveAllOperation().addEntry(event, true);
}
if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ // DR.destroy, hasSeenEvent. no to notifyGateway
distributeDestroy(event, expectedOldValue);
event.invokeCallbacks(this, true, false);
}
@@ -1676,8 +1683,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
+ // DR.destroy, it has notifiedGatewaySender ealier
DestroyOperation op = new DestroyOperation(event);
- op.distribute();
+ long viewVersion = op.startOperation();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
}
}
@@ -1732,7 +1745,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
* @since GemFire 5.7
*/
protected void distributeInvalidateRegion(RegionEventImpl event) {
- new InvalidateRegionOperation(event).distribute();
+ InvalidateRegionOperation op = new InvalidateRegionOperation(event);
+ long viewVersion = op.startOperation();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
/**
@@ -1781,7 +1800,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (persistenceAdvisor != null) {
persistenceAdvisor.releaseTieLock();
}
- new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
+ DestroyRegionOperation op = new DestroyRegionOperation(event, notifyOfRegionDeparture);
+ long viewVersion = op.startOperation();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
/**
@@ -1858,8 +1883,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (event.isDistributed() && !event.isOriginRemote()) {
boolean distribute = !event.getInhibitDistribution();
if (distribute) {
+ // DR.invalidate, it has triggered callback earlier
InvalidateOperation op = new InvalidateOperation(event);
- op.distribute();
+ long viewVersion = op.startOperation();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
}
}
@@ -1890,8 +1921,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
&& !isTX() /* only distribute if non-tx */) {
if (event.isDistributed() && !event.isOriginRemote()) {
+ // DR has sent callback earlier
UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
- op.distribute();
+ long viewVersion = op.startOperation();
+ try {
+ op.distribute();
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 119dd43..6361aa6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -311,8 +311,14 @@ public class LocalRegionDataView implements InternalDataView {
successfulPuts.clear();
putallOp.fillVersionedObjectList(successfulPuts);
}
- region.postPutAllSend(putallOp, successfulPuts);
- region.postPutAllFireEvents(putallOp, successfulPuts);
+ // BR & DR's putAll
+ long viewVersion = putallOp.startOperation();
+ try {
+ region.postPutAllSend(putallOp, successfulPuts);
+ region.postPutAllFireEvents(putallOp, successfulPuts);
+ } finally {
+ putallOp.endOperation(viewVersion);
+ }
}
@Override
@@ -325,7 +331,13 @@ public class LocalRegionDataView implements InternalDataView {
successfulOps.clear();
op.fillVersionedObjectList(successfulOps);
}
- region.postRemoveAllSend(op, successfulOps);
- region.postRemoveAllFireEvents(op, successfulOps);
+ // BR, DR's removeAll
+ long viewVersion = op.startOperation();
+ try {
+ region.postRemoveAllSend(op, successfulOps);
+ region.postRemoveAllFireEvents(op, successfulOps);
+ } finally {
+ op.endOperation(viewVersion);
+ }
}
}