You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2016/02/19 00:09:56 UTC
incubator-geode git commit: GEODE-774
Repository: incubator-geode
Updated Branches:
refs/heads/develop 478e50d3a -> 4285d63a9
GEODE-774
When wan event was retried, the version tag might not be created.
Do not distribute such event.
Also add a unit test for the fix.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4285d63a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4285d63a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4285d63a
Branch: refs/heads/develop
Commit: 4285d63a96d1dff3014433117c4a9793ff3fdcf9
Parents: 478e50d
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Feb 18 14:03:10 2016 -0800
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Feb 18 14:21:22 2016 -0800
----------------------------------------------------------------------
.../membership/InternalDistributedMember.java | 2 +-
.../gemfire/internal/cache/BucketAdvisor.java | 2 +-
.../gemfire/internal/cache/BucketRegion.java | 26 ++-
.../internal/cache/DistributedRegion.java | 22 +-
.../gemfire/internal/cache/EntryEventImpl.java | 7 +
.../internal/cache/BucketRegionJUnitTest.java | 208 +++++++++++++++++++
6 files changed, 249 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
index 8b5c279..81058f8 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
@@ -53,7 +53,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
* This is the fundamental representation of a member of a GemFire distributed
* system.
*/
-public final class InternalDistributedMember
+public class InternalDistributedMember
implements DistributedMember,
Externalizable, DataSerializableFixedID, ProfileId,
VersionSource<DistributedMember>
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 98e72bd..a6649c3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -77,7 +77,7 @@ import com.gemstone.gemfire.internal.util.StopWatch;
* @author Kirk Lund
*/
@SuppressWarnings("synthetic-access")
-public final class BucketAdvisor extends CacheDistributionAdvisor {
+public class BucketAdvisor extends CacheDistributionAdvisor {
private static final Logger logger = LogService.getLogger();
public static final boolean ENFORCE_SAFE_CLOSE = false;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index fae381f..69f61c4 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -536,7 +536,9 @@ implements Bucket
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "BR.virtualPut: this cache has already seen this event {}", event);
}
- distributeUpdateOperation(event, lastModified);
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ distributeUpdateOperation(event, lastModified);
+ }
return true;
} finally {
endLocalWrite(event);
@@ -919,13 +921,15 @@ implements Bucket
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "LR.basicInvalidate: this cache has already seen this event {}", event);
}
- if (!event.isOriginRemote()
- && getBucketAdvisor().isPrimary()) {
- // This cache has processed the event, forward operation
- // and event messages to backup buckets
- new InvalidateOperation(event).distribute();
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ if (!event.isOriginRemote()
+ && getBucketAdvisor().isPrimary()) {
+ // This cache has processed the event, forward operation
+ // and event messages to backup buckets
+ new InvalidateOperation(event).distribute();
+ }
+ event.invokeCallbacks(this,true, false);
}
- event.invokeCallbacks(this,true, false);
return;
}
} finally {
@@ -1179,7 +1183,9 @@ implements Bucket
return;
}
else {
- distributeDestroyOperation(event);
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ distributeDestroyOperation(event);
+ }
return;
}
} finally {
@@ -1305,7 +1311,9 @@ implements Bucket
if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
// This cache has processed the event, forward operation
// and event messages to backup buckets
- new UpdateEntryVersionOperation(event).distribute();
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ new UpdateEntryVersionOperation(event).distribute();
+ }
}
return;
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 92b585a..f3e730a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -438,8 +438,10 @@ public class DistributedRegion extends LocalRegion implements
* local AbstractRegionMap, and so will never be flipped to a 'create'
*/
event.makeCreate();
- distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
- event.invokeCallbacks(this,true, true);
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
+ event.invokeCallbacks(this,true, true);
+ }
return true;
}
}
@@ -1826,8 +1828,10 @@ public class DistributedRegion extends LocalRegion implements
if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
event.getRemoveAllOperation().addEntry(event, true);
}
- distributeDestroy(event, expectedOldValue);
- event.invokeCallbacks(this,true, false);
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ distributeDestroy(event, expectedOldValue);
+ event.invokeCallbacks(this,true, false);
+ }
}
}
}
@@ -2008,8 +2012,10 @@ public class DistributedRegion extends LocalRegion implements
return;
} finally {
if (hasSeen) {
- distributeInvalidate(event);
- event.invokeCallbacks(this,true, false);
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ distributeInvalidate(event);
+ event.invokeCallbacks(this,true, false);
+ }
}
}
}
@@ -2045,7 +2051,9 @@ public class DistributedRegion extends LocalRegion implements
}
return;
} finally {
- distributeUpdateEntryVersion(event);
+ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+ distributeUpdateEntryVersion(event);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 9cf2f13..c731721 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -2838,6 +2838,13 @@ public class EntryEventImpl
}
/**
+ * @return if there's no valid version tag for this event
+ */
+ public boolean hasValidVersionTag() {
+ return this.versionTag != null && this.versionTag.hasValidVersion();
+ }
+
+ /**
* this method joins together version tag timestamps and the "lastModified"
* timestamps generated and stored in entries. If a change does not already
* carry a lastModified timestamp
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java
new file mode 100755
index 0000000..b2623fb
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.distributed.internal.DSClock;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import junit.framework.TestCase;
+import static org.mockito.Mockito.*;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author gzhou
+ *
+ */
+@Category(UnitTest.class)
+public class BucketRegionJUnitTest extends TestCase {
+
+ private static final Logger logger = LogService.getLogger();
+ GemFireCacheImpl cache = null;
+ InternalDistributedMember member = null;
+ PartitionedRegion pr = null;
+ BucketAdvisor ba = null;
+ RegionAttributes ra = null;
+ InternalRegionArguments ira = null;
+ BucketRegion br = null;
+
+ private EntryEventImpl prepare(boolean isConcurrencyChecksEnabled) {
+ // mock cache, distributed system, distribution manager, and distributed member
+ cache = mock(GemFireCacheImpl.class);
+ InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+ DistributionManager dm = mock(DistributionManager.class);
+ member = mock(InternalDistributedMember.class);
+ InvalidateOperation io = mock(InvalidateOperation.class);
+
+ // mock PR, bucket advisor, DSClock
+ pr = mock(PartitionedRegion.class);
+ ba = mock(BucketAdvisor.class);
+ DSClock clock = mock(DSClock.class);
+
+ // create some real objects
+ DistributionConfigImpl config = new DistributionConfigImpl(new Properties());
+ ReadWriteLock primaryMoveLock = new ReentrantReadWriteLock();
+ Lock activeWriteLock = primaryMoveLock.readLock();
+ GemFireCacheImpl.Stopper stopper = mock(GemFireCacheImpl.Stopper.class);
+
+ // define the mock objects' behaviors
+ when(cache.getDistributedSystem()).thenReturn(ids);
+ when(ids.getClock()).thenReturn(clock);
+ when(ids.getDistributionManager()).thenReturn(dm);
+ when(ids.getDistributedMember()).thenReturn(member);
+ when(dm.getConfig()).thenReturn(config);
+ when(member.getRoles()).thenReturn(new HashSet());
+ when(ba.getActiveWriteLock()).thenReturn(activeWriteLock);
+ when (cache.getCancelCriterion()).thenReturn(stopper);
+ when(ids.getCancelCriterion()).thenReturn(stopper);
+
+ // create region attributes and internal region arguments
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ factory.setConcurrencyChecksEnabled(isConcurrencyChecksEnabled); //
+ ra = factory.create();
+ ira = new InternalRegionArguments().setPartitionedRegion(pr)
+ .setPartitionedRegionBucketRedundancy(1)
+ .setBucketAdvisor(ba);
+ // create a bucket region object
+ br = new BucketRegion("testRegion", ra, null, cache, ira);
+
+ // since br is a real bucket region object, we need to tell mockito to monitor it
+ br = Mockito.spy(br);
+
+ doNothing().when(br).distributeUpdateOperation(any(), anyLong());
+ doNothing().when(br).distributeDestroyOperation(any());
+ doNothing().when(br).checkForPrimary();
+ doNothing().when(br).handleWANEvent(any());
+ doReturn(false).when(br).needWriteLock(any());
+ doReturn(true).when(br).hasSeenEvent(any(EntryEventImpl.class));
+
+ EntryEventImpl event = createDummyEvent();
+ return event;
+ }
+
+ private EventID createDummyEventID() {
+ byte[] memId = { 1,2,3 };
+ EventID eventId = new EventID(memId, 11, 12, 13);
+ return eventId;
+ }
+
+ private EntryEventImpl createDummyEvent() {
+ // create a dummy event id
+ EventID eventId = createDummyEventID();
+ String key = "key1";
+ String value = "Value1";
+
+ // create an event
+ EntryEventImpl event = EntryEventImpl.create(br, Operation.CREATE, key,
+ value, null, false /* origin remote */, null,
+ false /* generateCallbacks */,
+ eventId);
+
+ return event;
+ }
+
+ private VersionTag createVersionTag(boolean invalid) {
+ VersionTag tag = VersionTag.create(member);
+ if (invalid == false) {
+ tag.setRegionVersion(1);
+ tag.setEntryVersion(1);
+ }
+ return tag;
+ }
+
+ private void doTest(EntryEventImpl event, int cnt) {
+ // do the virtualPut test
+ br.virtualPut(event, false, false, null, false, 12345L, false);
+ // verify the result
+ if (cnt > 0) {
+ verify(br, times(cnt)).distributeUpdateOperation(eq(event), eq(12345L));
+ } else {
+ verify(br, never()).distributeUpdateOperation(eq(event), eq(12345L));
+ }
+
+ // do the basicDestroy test
+ br.basicDestroy(event, false, null);
+ // verify the result
+ if (cnt > 0) {
+ verify(br, times(cnt)).distributeDestroyOperation(eq(event));
+ } else {
+ verify(br, never()).distributeDestroyOperation(eq(event));
+ }
+
+ }
+
+ @Test
+ public void testConcurrencyFalseTagNull() {
+ // case 1: concurrencyCheckEanbled = false, version tag is null: distribute
+ EntryEventImpl event = prepare(false);
+ assertNull(event.getVersionTag());
+ doTest(event, 1);
+ }
+
+ @Test
+ public void testConcurrencyTrueTagNull() {
+ // case 2: concurrencyCheckEanbled = true, version tag is null: not to distribute
+ EntryEventImpl event = prepare(true);
+ assertNull(event.getVersionTag());
+ doTest(event, 0);
+ }
+
+ @Test
+ public void testConcurrencyTrueTagInvalid() {
+ // case 3: concurrencyCheckEanbled = true, version tag is invalid: not to distribute
+ EntryEventImpl event = prepare(true);
+ VersionTag tag = createVersionTag(true);
+ event.setVersionTag(tag);
+ assertFalse(tag.hasValidVersion());
+ doTest(event, 0);
+ }
+
+ @Test
+ public void testConcurrencyTrueTagValid() {
+ // case 4: concurrencyCheckEanbled = true, version tag is valid: distribute
+ EntryEventImpl event = prepare(true);
+ VersionTag tag = createVersionTag(false);
+ event.setVersionTag(tag);
+ assertTrue(tag.hasValidVersion());
+ doTest(event, 1);
+ }
+
+
+}
+