You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/05/19 19:27:30 UTC

[25/36] incubator-geode git commit: GEODE-1224: fix mutation of cloning on PRs

GEODE-1224: fix mutation of cloning on PRs

Changed BucketRegon getCloningEnabled ans setCloningEnabled to delegate
to the bucket's PartitionedRegion.

This closes #142


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/123ddb7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/123ddb7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/123ddb7c

Branch: refs/heads/feature/GEODE-1372
Commit: 123ddb7ccf847203c35b314dd86dd6425a4cb500
Parents: 6dd3a58
Author: Scott Jewell <sj...@pivotal.io>
Authored: Fri Apr 29 13:37:22 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed May 11 15:29:41 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/AbstractRegion.java  |   2 +-
 .../gemfire/internal/cache/BucketRegion.java    |  10 ++
 .../com/gemstone/gemfire/DeltaTestImpl.java     |  10 ++
 .../cache/PRDeltaPropagationDUnitTest.java      | 173 +++++++++++++++++--
 4 files changed, 182 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index ef7e4e3..8651cc8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -206,7 +206,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
    */
   protected boolean offHeap;
 
-  protected boolean cloningEnable = false;
+  private boolean cloningEnable = false;
 
   protected DiskWriteAttributes diskWriteAttributes;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index f5ae0fb..e2482bb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -2407,6 +2407,16 @@ implements Bucket
   }
 
   @Override
+  public void setCloningEnabled(boolean isCloningEnabled){
+    this.partitionedRegion.setCloningEnabled(isCloningEnabled);
+  }
+
+  @Override
+  public boolean getCloningEnabled(){
+    return this.partitionedRegion.getCloningEnabled();
+  }
+
+  @Override
   protected void generateLocalFilterRouting(InternalCacheEvent event) {
     if (event.getLocalFilterInfo() == null) {
       super.generateLocalFilterRouting(event);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
index cd82459..6356ab8 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
@@ -51,6 +51,7 @@ public class DeltaTestImpl implements DataSerializable, Delta {
   private static long toDeltaInvokations;
   private static long toDeltaFailure;
   private static long fromDeltaFailure;
+  private static long timesConstructed = 0;
   public static boolean NEED_TO_RESET_T0_DELTA = true;
   /** *********************************************************************** */
   
@@ -69,6 +70,7 @@ public class DeltaTestImpl implements DataSerializable, Delta {
   private boolean hasDelta = false;
 
   public DeltaTestImpl() {
+    timesConstructed++;
   }
 
   public DeltaTestImpl(int intVal, String str) {
@@ -85,6 +87,14 @@ public class DeltaTestImpl implements DataSerializable, Delta {
     this.testObj = testObj;
   }
 
+  public static long getTimesConstructed() {
+    return timesConstructed;
+  }
+  
+  public static void setTimesConstructed(long cnt) {
+    timesConstructed = cnt;
+  }
+  
   public void resetDeltaStatus() {
     this.deltaBits = 0x0;
     this.hasDelta = false;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
index e8816f9..3b35954 100755
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
@@ -156,6 +156,123 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
   }
 
   /**
+   *  Monitor number of times constructor is called
+   *  Without copy or cloning, we should have 1 instance
+   */
+  public void testConstructorCountWithoutCloning() throws Throwable {
+
+    clearConstructorCounts();
+    createCacheInAllPRVms();
+    createDeltaPR(Boolean.FALSE);
+    
+    // Verify that cloning is disabled
+    assertFalse(((LocalRegion)deltaPR).getCloningEnabled());
+
+    verifyNoCopy();
+    
+    putInitial();  // Does multiple puts
+   
+    verifyConstructorCount(1);
+  }
+  
+  /**
+   *  Monitor number of times constructor is called
+   *  With cloning, we should have more than 1 instance
+   *  on members receiving delta updates
+   */
+  public void testConstructorCountWithCloning() throws Throwable {
+
+    clearConstructorCounts();
+    createCacheInAllPRVms();
+    createDeltaPRWithCloning(Boolean.FALSE);
+        
+    // Verify that cloning is enabled
+    assertTrue(((LocalRegion)deltaPR).getCloningEnabled());
+
+    verifyNoCopy();
+    
+    putInitial();  // Does multiple puts
+   
+    verifyConstructorCount(2);
+
+  }
+
+  /**
+   *  Create partition with cloning disabled, then
+   *  enable cloning and verify proper operation 
+   */
+  public void testConstructorCountWithMutator() throws Throwable {
+
+    clearConstructorCounts();
+    createCacheInAllPRVms();
+    createDeltaPR(Boolean.FALSE);
+        
+    // Verify that cloning is disabled
+    assertFalse(((LocalRegion)deltaPR).getCloningEnabled());
+
+    verifyNoCopy();
+    
+    PRDeltaTestImpl myDTI = putInitial();  // Does multiple puts
+   
+    // With cloning disabled only single instance
+    verifyConstructorCount(1);
+
+    // Now set cloning enabled
+    setPRCloning(true);
+    // Verify that cloning is enabled
+    assertTrue(((LocalRegion)deltaPR).getCloningEnabled());
+  
+    // With cloning enabled each put/delta should create a new instance
+    putMore(myDTI);  
+    
+    verifyConstructorCount(3);
+  }
+
+  private void verifyConstructorCount(int timesConstructed) throws Exception {
+        
+    long buildCount0 = getBuildCount();
+    long buildCount1 = (long)dataStore1.invoke(() -> PRDeltaPropagationDUnitTest.getBuildCount());
+    long buildCount2 = (long)dataStore2.invoke(() -> PRDeltaPropagationDUnitTest.getBuildCount());
+    
+    assertEquals(1, buildCount0);
+    assertEquals(timesConstructed, buildCount1);
+    assertEquals(timesConstructed, buildCount2);
+  }
+
+  private void setPRCloning(boolean cloning) {
+    setCloningEnabled(cloning);
+    dataStore1.invoke(() -> PRDeltaPropagationDUnitTest.setCloningEnabled(cloning));
+    dataStore2.invoke(() -> PRDeltaPropagationDUnitTest.setCloningEnabled(cloning));
+  }
+  
+  private static void setCloningEnabled(boolean cloningEnabled) {
+    ((LocalRegion)deltaPR).setCloningEnabled(cloningEnabled);
+  }
+  
+  private void verifyNoCopy() {
+    // Ensure not some other reason to make a copy
+    FilterProfile fp = ((LocalRegion)deltaPR).getFilterProfile();
+    boolean copy = ((LocalRegion)deltaPR).getCompressor() == null &&
+        (((LocalRegion)deltaPR).isCopyOnRead()
+        || (fp != null && fp.getCqCount() > 0));
+    assertFalse(copy);
+  }
+  
+  private void clearConstructorCounts() throws Throwable {
+    setBuildCount(0);
+    dataStore1.invoke(() -> PRDeltaPropagationDUnitTest.setBuildCount(0));
+    dataStore2.invoke(() -> PRDeltaPropagationDUnitTest.setBuildCount(0));
+  }
+  
+  public static long getBuildCount() throws Exception {
+    return DeltaTestImpl.getTimesConstructed();
+  }
+
+  public static void setBuildCount(long cnt) throws Exception {
+    DeltaTestImpl.setTimesConstructed(cnt);
+  }
+
+  /**
    * Check delta propagation works properly with PR failover.    
    */
 
@@ -760,17 +877,26 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
     }
   }
 
-  private static void createDeltaPR(Boolean flag) {
+  private static void createDeltaPR(Boolean setExpiry) {
+    Object args[] = new Object[] { "DeltaPR", new Integer(1), new Integer(50),
+        new Integer(8), setExpiry, false, null };
+    createPR("DeltaPR", new Integer(1), new Integer(0), new Integer(8), setExpiry, false, null);
+    dataStore1.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
+    dataStore2.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
+
+  }
+
+  private static void createDeltaPRWithCloning(Boolean setExpiry) {
     Object args[] = new Object[] { "DeltaPR", new Integer(1), new Integer(50),
-        new Integer(8), flag, null };
-    createPR("DeltaPR", new Integer(1), new Integer(0), new Integer(8), flag, null);
+        new Integer(8), setExpiry, true, null };
+    createPR("DeltaPR", new Integer(1), new Integer(0), new Integer(8), setExpiry, true, null);
     dataStore1.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
     dataStore2.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
 
   }
 
   public static void createPR(String partitionedRegionName, Integer redundancy,
-      Integer localMaxMemory, Integer totalNumBuckets, Boolean setExpiry, Compressor compressor) {
+      Integer localMaxMemory, Integer totalNumBuckets, Boolean setExpiry, Boolean withCloning, Compressor compressor) {
 
     PartitionAttributesFactory paf = new PartitionAttributesFactory();
     PartitionAttributes prAttr = paf.setRedundantCopies(redundancy.intValue())
@@ -779,7 +905,8 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
     AttributesFactory attr = new AttributesFactory();
     attr.setPartitionAttributes(prAttr);
     attr.setDataPolicy(DataPolicy.PARTITION);
-    attr.setConcurrencyChecksEnabled(true);
+    attr.setConcurrencyChecksEnabled(true);    
+    attr.setCloningEnabled(withCloning);    
     if (setExpiry) {
       attr.setStatisticsEnabled(true);
       attr.setEntryIdleTimeout(new ExpirationAttributes(1,
@@ -800,14 +927,16 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
   public static Integer createCacheServerWithPR(String partitionedRegionName,
       Integer redundancy, Integer localMaxMemory, Integer totalNumBuckets,
       Boolean setExpiry, Compressor compressor) {
-    new PRDeltaPropagationDUnitTest("temp").createCache(new Properties());
+    
+    createCacheInVm();
+    
     createPR(partitionedRegionName, redundancy, localMaxMemory,
-        totalNumBuckets, setExpiry, compressor);
-    assertNotNull(deltaPR);
+        totalNumBuckets, setExpiry, Boolean.FALSE, compressor);
     deltaPR.put(DELTA_KEY, new PRDeltaTestImpl());
     
     CacheServer server1 = cache.addCacheServer();
     assertNotNull(server1);
+    
     int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
     server1.setPort(port);
     try {
@@ -996,18 +1125,39 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
   }
 
   public static void put() throws Exception {
-    DeltaTestImpl test = new DeltaTestImpl();
+    PRDeltaTestImpl test = new PRDeltaTestImpl();
     deltaPR.put(DELTA_KEY, test);
 
     test.setIntVar(10);
     deltaPR.put(DELTA_KEY, test);
 
-    test = new DeltaTestImpl();
+    test = new PRDeltaTestImpl();
+    
     test.setStr("DELTA");
     deltaPR.put(DELTA_KEY, test);
+  }
+
+  public static PRDeltaTestImpl putInitial() throws Exception {
+    PRDeltaTestImpl test = new PRDeltaTestImpl();
+    deltaPR.put(DELTA_KEY, test);
+
+    test.setIntVar(10);
+    deltaPR.put(DELTA_KEY, test);
 
+    test.setStr("DELTA");
+    deltaPR.put(DELTA_KEY, test);
+    
+    return test;
   }
 
+  public static void putMore(PRDeltaTestImpl val) throws Exception {
+    val.setIntVar(13);
+    put(val);
+
+    val.setStr("DELTA2");
+    put(val);
+  }
+  
   public static void put(PRDeltaTestImpl val) throws Exception {
     deltaPR.put(DELTA_KEY, val);
   }
@@ -1141,9 +1291,8 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
   static class PRDeltaTestImpl extends DeltaTestImpl {
     int deltaSent = 0;
     int deltaApplied = 0;
-
+    
     public PRDeltaTestImpl() {
-
     }
 
     public void toDelta(DataOutput out) throws IOException {