You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2016/02/19 00:09:56 UTC

incubator-geode git commit: GEODE-774

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 478e50d3a -> 4285d63a9


GEODE-774

When wan event was retried, the version tag might not be created.
Do not distribute such event.

Also add a unit test for the fix.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4285d63a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4285d63a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4285d63a

Branch: refs/heads/develop
Commit: 4285d63a96d1dff3014433117c4a9793ff3fdcf9
Parents: 478e50d
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Feb 18 14:03:10 2016 -0800
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Feb 18 14:21:22 2016 -0800

----------------------------------------------------------------------
 .../membership/InternalDistributedMember.java   |   2 +-
 .../gemfire/internal/cache/BucketAdvisor.java   |   2 +-
 .../gemfire/internal/cache/BucketRegion.java    |  26 ++-
 .../internal/cache/DistributedRegion.java       |  22 +-
 .../gemfire/internal/cache/EntryEventImpl.java  |   7 +
 .../internal/cache/BucketRegionJUnitTest.java   | 208 +++++++++++++++++++
 6 files changed, 249 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
index 8b5c279..81058f8 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/InternalDistributedMember.java
@@ -53,7 +53,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
  * This is the fundamental representation of a member of a GemFire distributed
  * system.
  */
-public final class InternalDistributedMember
+public class InternalDistributedMember
  implements DistributedMember,
     Externalizable, DataSerializableFixedID, ProfileId,
     VersionSource<DistributedMember>

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 98e72bd..a6649c3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -77,7 +77,7 @@ import com.gemstone.gemfire.internal.util.StopWatch;
  * @author Kirk Lund
  */
 @SuppressWarnings("synthetic-access")
-public final class BucketAdvisor extends CacheDistributionAdvisor  {
+public class BucketAdvisor extends CacheDistributionAdvisor  {
   private static final Logger logger = LogService.getLogger();
 
   public static final boolean ENFORCE_SAFE_CLOSE = false;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index fae381f..69f61c4 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -536,7 +536,9 @@ implements Bucket
       if (logger.isTraceEnabled(LogMarker.DM)) {
         logger.trace(LogMarker.DM, "BR.virtualPut: this cache has already seen this event {}", event);
       }
-      distributeUpdateOperation(event, lastModified);
+      if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+        distributeUpdateOperation(event, lastModified);
+      }
       return true;
     } finally {
       endLocalWrite(event);
@@ -919,13 +921,15 @@ implements Bucket
         if (logger.isTraceEnabled(LogMarker.DM)) {
           logger.trace(LogMarker.DM, "LR.basicInvalidate: this cache has already seen this event {}", event);
         }
-        if (!event.isOriginRemote()
-            && getBucketAdvisor().isPrimary()) {
-          // This cache has processed the event, forward operation
-          // and event messages to backup buckets
-          new InvalidateOperation(event).distribute();
+        if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          if (!event.isOriginRemote()
+              && getBucketAdvisor().isPrimary()) {
+            // This cache has processed the event, forward operation
+            // and event messages to backup buckets
+            new InvalidateOperation(event).distribute();
+          }
+          event.invokeCallbacks(this,true, false);
         }
-        event.invokeCallbacks(this,true, false);
         return;
       }
     } finally {
@@ -1179,7 +1183,9 @@ implements Bucket
         return;
       }
       else {
-        distributeDestroyOperation(event);
+    	if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          distributeDestroyOperation(event);
+    	}
         return;
       }
     } finally {
@@ -1305,7 +1311,9 @@ implements Bucket
       if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
         // This cache has processed the event, forward operation
         // and event messages to backup buckets
-        new UpdateEntryVersionOperation(event).distribute();
+    	if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          new UpdateEntryVersionOperation(event).distribute();
+    	}
       }
       return;
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 92b585a..f3e730a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -438,8 +438,10 @@ public class DistributedRegion extends LocalRegion implements
          * local AbstractRegionMap, and so will never be flipped to a 'create'
          */
         event.makeCreate();
-        distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
-        event.invokeCallbacks(this,true, true);
+        if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          distributeUpdate(event, lastModified, ifNew, ifOld, expectedOldValue, requireOldValue);
+          event.invokeCallbacks(this,true, true);
+        }
         return true;
       }
     }
@@ -1826,8 +1828,10 @@ public class DistributedRegion extends LocalRegion implements
         if (event.isBulkOpInProgress() && !event.isOriginRemote()) {
           event.getRemoveAllOperation().addEntry(event, true);
         }
-        distributeDestroy(event, expectedOldValue);
-        event.invokeCallbacks(this,true, false);
+        if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          distributeDestroy(event, expectedOldValue);
+          event.invokeCallbacks(this,true, false);
+        }
       }
     }
   }
@@ -2008,8 +2012,10 @@ public class DistributedRegion extends LocalRegion implements
       return;
     } finally {
       if (hasSeen) {
-        distributeInvalidate(event);
-        event.invokeCallbacks(this,true, false);
+    	if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          distributeInvalidate(event);
+          event.invokeCallbacks(this,true, false);
+    	}
       }
     }
   }
@@ -2045,7 +2051,9 @@ public class DistributedRegion extends LocalRegion implements
       }
       return;
     } finally {
-      distributeUpdateEntryVersion(event);
+      if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+        distributeUpdateEntryVersion(event);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 9cf2f13..c731721 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -2838,6 +2838,13 @@ public class EntryEventImpl
   }
   
   /**
+   * @return if there's no valid version tag for this event
+   */
+  public boolean hasValidVersionTag() {
+    return this.versionTag != null && this.versionTag.hasValidVersion();
+  }
+  
+  /**
    * this method joins together version tag timestamps and the "lastModified"
    * timestamps generated and stored in entries.  If a change does not already
    * carry a lastModified timestamp 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4285d63a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java
new file mode 100755
index 0000000..b2623fb
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/BucketRegionJUnitTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.distributed.internal.DSClock;
+import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+import junit.framework.TestCase;
+import static org.mockito.Mockito.*;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author gzhou
+ *
+ */
+@Category(UnitTest.class)
+public class BucketRegionJUnitTest extends TestCase {
+
+  private static final Logger logger = LogService.getLogger();
+  GemFireCacheImpl cache = null;
+  InternalDistributedMember member = null;
+  PartitionedRegion pr = null;
+  BucketAdvisor ba = null;
+  RegionAttributes ra = null;
+  InternalRegionArguments ira = null;
+  BucketRegion br = null;
+  
+  private EntryEventImpl prepare(boolean isConcurrencyChecksEnabled) {
+    // mock cache, distributed system, distribution manager, and distributed member
+    cache = mock(GemFireCacheImpl.class);
+    InternalDistributedSystem ids = mock(InternalDistributedSystem.class);
+    DistributionManager dm = mock(DistributionManager.class);
+    member = mock(InternalDistributedMember.class);
+    InvalidateOperation io = mock(InvalidateOperation.class);
+
+    // mock PR, bucket advisor, DSClock 
+    pr = mock(PartitionedRegion.class);
+    ba = mock(BucketAdvisor.class);
+    DSClock clock = mock(DSClock.class);
+
+    // create some real objects
+    DistributionConfigImpl config = new DistributionConfigImpl(new Properties());
+    ReadWriteLock primaryMoveLock = new ReentrantReadWriteLock();
+    Lock activeWriteLock = primaryMoveLock.readLock();
+    GemFireCacheImpl.Stopper stopper = mock(GemFireCacheImpl.Stopper.class);
+
+    // define the mock objects' behaviors
+    when(cache.getDistributedSystem()).thenReturn(ids);
+    when(ids.getClock()).thenReturn(clock);
+    when(ids.getDistributionManager()).thenReturn(dm);
+    when(ids.getDistributedMember()).thenReturn(member);
+    when(dm.getConfig()).thenReturn(config);
+    when(member.getRoles()).thenReturn(new HashSet());
+    when(ba.getActiveWriteLock()).thenReturn(activeWriteLock);
+    when (cache.getCancelCriterion()).thenReturn(stopper);
+    when(ids.getCancelCriterion()).thenReturn(stopper);
+
+    // create region attributes and internal region arguments
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+    factory.setConcurrencyChecksEnabled(isConcurrencyChecksEnabled); //
+    ra = factory.create();
+    ira = new InternalRegionArguments().setPartitionedRegion(pr)
+        .setPartitionedRegionBucketRedundancy(1)
+        .setBucketAdvisor(ba);
+    // create a bucket region object
+    br = new BucketRegion("testRegion", ra, null, cache, ira);
+    
+    // since br is a real bucket region object, we need to tell mockito to monitor it
+    br = Mockito.spy(br);
+
+    doNothing().when(br).distributeUpdateOperation(any(), anyLong());
+    doNothing().when(br).distributeDestroyOperation(any());
+    doNothing().when(br).checkForPrimary();
+    doNothing().when(br).handleWANEvent(any());
+    doReturn(false).when(br).needWriteLock(any());
+    doReturn(true).when(br).hasSeenEvent(any(EntryEventImpl.class));
+    
+    EntryEventImpl event = createDummyEvent();
+    return event;
+  }
+  
+  private EventID createDummyEventID() {
+    byte[] memId = { 1,2,3 };
+    EventID eventId = new EventID(memId, 11, 12, 13);
+    return eventId;
+  }
+
+  private EntryEventImpl createDummyEvent() {
+    // create a dummy event id
+    EventID eventId = createDummyEventID();
+    String key = "key1";
+    String value = "Value1";
+
+    // create an event
+    EntryEventImpl event = EntryEventImpl.create(br, Operation.CREATE, key,
+        value, null,  false /* origin remote */, null,
+        false /* generateCallbacks */,
+        eventId);
+
+    return event;
+  }
+  
+  private VersionTag createVersionTag(boolean invalid) {
+    VersionTag tag = VersionTag.create(member);
+    if (invalid == false) {
+      tag.setRegionVersion(1);
+      tag.setEntryVersion(1);
+    }
+    return tag;
+  }
+  
+  private void doTest(EntryEventImpl event, int cnt) {
+    // do the virtualPut test
+    br.virtualPut(event, false, false, null, false, 12345L, false);
+    // verify the result
+    if (cnt > 0) {
+      verify(br, times(cnt)).distributeUpdateOperation(eq(event), eq(12345L));
+    } else {
+      verify(br, never()).distributeUpdateOperation(eq(event), eq(12345L));
+    }
+    
+    // do the basicDestroy test
+    br.basicDestroy(event, false, null);
+    // verify the result
+    if (cnt > 0) {
+      verify(br, times(cnt)).distributeDestroyOperation(eq(event));
+    } else {
+      verify(br, never()).distributeDestroyOperation(eq(event));
+    }
+    
+  }
+  
+  @Test
+  public void testConcurrencyFalseTagNull() {
+    // case 1: concurrencyCheckEanbled = false, version tag is null: distribute
+    EntryEventImpl event = prepare(false);
+    assertNull(event.getVersionTag());
+    doTest(event, 1);
+  }
+
+  @Test
+  public void testConcurrencyTrueTagNull() {
+    // case 2: concurrencyCheckEanbled = true,  version tag is null: not to distribute
+    EntryEventImpl event = prepare(true);
+    assertNull(event.getVersionTag());
+    doTest(event, 0);
+  }
+  
+  @Test
+  public void testConcurrencyTrueTagInvalid() {
+    // case 3: concurrencyCheckEanbled = true,  version tag is invalid: not to distribute
+    EntryEventImpl event = prepare(true);
+    VersionTag tag = createVersionTag(true);
+    event.setVersionTag(tag);
+    assertFalse(tag.hasValidVersion());
+    doTest(event, 0);
+  }
+    
+  @Test
+  public void testConcurrencyTrueTagValid() {
+    // case 4: concurrencyCheckEanbled = true,  version tag is valid: distribute
+    EntryEventImpl event = prepare(true);
+    VersionTag tag = createVersionTag(false);
+    event.setVersionTag(tag);
+    assertTrue(tag.hasValidVersion());
+    doTest(event, 1);
+  }
+
+
+}
+