You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/05/19 19:27:30 UTC
[25/36] incubator-geode git commit: GEODE-1224: fix mutation of
cloning on PRs
GEODE-1224: fix mutation of cloning on PRs
Changed BucketRegon getCloningEnabled ans setCloningEnabled to delegate
to the bucket's PartitionedRegion.
This closes #142
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/123ddb7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/123ddb7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/123ddb7c
Branch: refs/heads/feature/GEODE-1372
Commit: 123ddb7ccf847203c35b314dd86dd6425a4cb500
Parents: 6dd3a58
Author: Scott Jewell <sj...@pivotal.io>
Authored: Fri Apr 29 13:37:22 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed May 11 15:29:41 2016 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/AbstractRegion.java | 2 +-
.../gemfire/internal/cache/BucketRegion.java | 10 ++
.../com/gemstone/gemfire/DeltaTestImpl.java | 10 ++
.../cache/PRDeltaPropagationDUnitTest.java | 173 +++++++++++++++++--
4 files changed, 182 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index ef7e4e3..8651cc8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -206,7 +206,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
*/
protected boolean offHeap;
- protected boolean cloningEnable = false;
+ private boolean cloningEnable = false;
protected DiskWriteAttributes diskWriteAttributes;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index f5ae0fb..e2482bb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -2407,6 +2407,16 @@ implements Bucket
}
@Override
+ public void setCloningEnabled(boolean isCloningEnabled){
+ this.partitionedRegion.setCloningEnabled(isCloningEnabled);
+ }
+
+ @Override
+ public boolean getCloningEnabled(){
+ return this.partitionedRegion.getCloningEnabled();
+ }
+
+ @Override
protected void generateLocalFilterRouting(InternalCacheEvent event) {
if (event.getLocalFilterInfo() == null) {
super.generateLocalFilterRouting(event);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java b/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
index cd82459..6356ab8 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/DeltaTestImpl.java
@@ -51,6 +51,7 @@ public class DeltaTestImpl implements DataSerializable, Delta {
private static long toDeltaInvokations;
private static long toDeltaFailure;
private static long fromDeltaFailure;
+ private static long timesConstructed = 0;
public static boolean NEED_TO_RESET_T0_DELTA = true;
/** *********************************************************************** */
@@ -69,6 +70,7 @@ public class DeltaTestImpl implements DataSerializable, Delta {
private boolean hasDelta = false;
public DeltaTestImpl() {
+ timesConstructed++;
}
public DeltaTestImpl(int intVal, String str) {
@@ -85,6 +87,14 @@ public class DeltaTestImpl implements DataSerializable, Delta {
this.testObj = testObj;
}
+ public static long getTimesConstructed() {
+ return timesConstructed;
+ }
+
+ public static void setTimesConstructed(long cnt) {
+ timesConstructed = cnt;
+ }
+
public void resetDeltaStatus() {
this.deltaBits = 0x0;
this.hasDelta = false;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/123ddb7c/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
index e8816f9..3b35954 100755
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/PRDeltaPropagationDUnitTest.java
@@ -156,6 +156,123 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
}
/**
+ * Monitor number of times constructor is called
+ * Without copy or cloning, we should have 1 instance
+ */
+ public void testConstructorCountWithoutCloning() throws Throwable {
+
+ clearConstructorCounts();
+ createCacheInAllPRVms();
+ createDeltaPR(Boolean.FALSE);
+
+ // Verify that cloning is disabled
+ assertFalse(((LocalRegion)deltaPR).getCloningEnabled());
+
+ verifyNoCopy();
+
+ putInitial(); // Does multiple puts
+
+ verifyConstructorCount(1);
+ }
+
+ /**
+ * Monitor number of times constructor is called
+ * With cloning, we should have more than 1 instance
+ * on members receiving delta updates
+ */
+ public void testConstructorCountWithCloning() throws Throwable {
+
+ clearConstructorCounts();
+ createCacheInAllPRVms();
+ createDeltaPRWithCloning(Boolean.FALSE);
+
+ // Verify that cloning is enabled
+ assertTrue(((LocalRegion)deltaPR).getCloningEnabled());
+
+ verifyNoCopy();
+
+ putInitial(); // Does multiple puts
+
+ verifyConstructorCount(2);
+
+ }
+
+ /**
+ * Create partition with cloning disabled, then
+ * enable cloning and verify proper operation
+ */
+ public void testConstructorCountWithMutator() throws Throwable {
+
+ clearConstructorCounts();
+ createCacheInAllPRVms();
+ createDeltaPR(Boolean.FALSE);
+
+ // Verify that cloning is disabled
+ assertFalse(((LocalRegion)deltaPR).getCloningEnabled());
+
+ verifyNoCopy();
+
+ PRDeltaTestImpl myDTI = putInitial(); // Does multiple puts
+
+ // With cloning disabled only single instance
+ verifyConstructorCount(1);
+
+ // Now set cloning enabled
+ setPRCloning(true);
+ // Verify that cloning is enabled
+ assertTrue(((LocalRegion)deltaPR).getCloningEnabled());
+
+ // With cloning enabled each put/delta should create a new instance
+ putMore(myDTI);
+
+ verifyConstructorCount(3);
+ }
+
+ private void verifyConstructorCount(int timesConstructed) throws Exception {
+
+ long buildCount0 = getBuildCount();
+ long buildCount1 = (long)dataStore1.invoke(() -> PRDeltaPropagationDUnitTest.getBuildCount());
+ long buildCount2 = (long)dataStore2.invoke(() -> PRDeltaPropagationDUnitTest.getBuildCount());
+
+ assertEquals(1, buildCount0);
+ assertEquals(timesConstructed, buildCount1);
+ assertEquals(timesConstructed, buildCount2);
+ }
+
+ private void setPRCloning(boolean cloning) {
+ setCloningEnabled(cloning);
+ dataStore1.invoke(() -> PRDeltaPropagationDUnitTest.setCloningEnabled(cloning));
+ dataStore2.invoke(() -> PRDeltaPropagationDUnitTest.setCloningEnabled(cloning));
+ }
+
+ private static void setCloningEnabled(boolean cloningEnabled) {
+ ((LocalRegion)deltaPR).setCloningEnabled(cloningEnabled);
+ }
+
+ private void verifyNoCopy() {
+ // Ensure not some other reason to make a copy
+ FilterProfile fp = ((LocalRegion)deltaPR).getFilterProfile();
+ boolean copy = ((LocalRegion)deltaPR).getCompressor() == null &&
+ (((LocalRegion)deltaPR).isCopyOnRead()
+ || (fp != null && fp.getCqCount() > 0));
+ assertFalse(copy);
+ }
+
+ private void clearConstructorCounts() throws Throwable {
+ setBuildCount(0);
+ dataStore1.invoke(() -> PRDeltaPropagationDUnitTest.setBuildCount(0));
+ dataStore2.invoke(() -> PRDeltaPropagationDUnitTest.setBuildCount(0));
+ }
+
+ public static long getBuildCount() throws Exception {
+ return DeltaTestImpl.getTimesConstructed();
+ }
+
+ public static void setBuildCount(long cnt) throws Exception {
+ DeltaTestImpl.setTimesConstructed(cnt);
+ }
+
+ /**
* Check delta propagation works properly with PR failover.
*/
@@ -760,17 +877,26 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
}
}
- private static void createDeltaPR(Boolean flag) {
+ private static void createDeltaPR(Boolean setExpiry) {
+ Object args[] = new Object[] { "DeltaPR", new Integer(1), new Integer(50),
+ new Integer(8), setExpiry, false, null };
+ createPR("DeltaPR", new Integer(1), new Integer(0), new Integer(8), setExpiry, false, null);
+ dataStore1.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
+ dataStore2.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
+
+ }
+
+ private static void createDeltaPRWithCloning(Boolean setExpiry) {
Object args[] = new Object[] { "DeltaPR", new Integer(1), new Integer(50),
- new Integer(8), flag, null };
- createPR("DeltaPR", new Integer(1), new Integer(0), new Integer(8), flag, null);
+ new Integer(8), setExpiry, true, null };
+ createPR("DeltaPR", new Integer(1), new Integer(0), new Integer(8), setExpiry, true, null);
dataStore1.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
dataStore2.invoke(PRDeltaPropagationDUnitTest.class, "createPR", args);
}
public static void createPR(String partitionedRegionName, Integer redundancy,
- Integer localMaxMemory, Integer totalNumBuckets, Boolean setExpiry, Compressor compressor) {
+ Integer localMaxMemory, Integer totalNumBuckets, Boolean setExpiry, Boolean withCloning, Compressor compressor) {
PartitionAttributesFactory paf = new PartitionAttributesFactory();
PartitionAttributes prAttr = paf.setRedundantCopies(redundancy.intValue())
@@ -779,7 +905,8 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
AttributesFactory attr = new AttributesFactory();
attr.setPartitionAttributes(prAttr);
attr.setDataPolicy(DataPolicy.PARTITION);
- attr.setConcurrencyChecksEnabled(true);
+ attr.setConcurrencyChecksEnabled(true);
+ attr.setCloningEnabled(withCloning);
if (setExpiry) {
attr.setStatisticsEnabled(true);
attr.setEntryIdleTimeout(new ExpirationAttributes(1,
@@ -800,14 +927,16 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
public static Integer createCacheServerWithPR(String partitionedRegionName,
Integer redundancy, Integer localMaxMemory, Integer totalNumBuckets,
Boolean setExpiry, Compressor compressor) {
- new PRDeltaPropagationDUnitTest("temp").createCache(new Properties());
+
+ createCacheInVm();
+
createPR(partitionedRegionName, redundancy, localMaxMemory,
- totalNumBuckets, setExpiry, compressor);
- assertNotNull(deltaPR);
+ totalNumBuckets, setExpiry, Boolean.FALSE, compressor);
deltaPR.put(DELTA_KEY, new PRDeltaTestImpl());
CacheServer server1 = cache.addCacheServer();
assertNotNull(server1);
+
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server1.setPort(port);
try {
@@ -996,18 +1125,39 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
}
public static void put() throws Exception {
- DeltaTestImpl test = new DeltaTestImpl();
+ PRDeltaTestImpl test = new PRDeltaTestImpl();
deltaPR.put(DELTA_KEY, test);
test.setIntVar(10);
deltaPR.put(DELTA_KEY, test);
- test = new DeltaTestImpl();
+ test = new PRDeltaTestImpl();
+
test.setStr("DELTA");
deltaPR.put(DELTA_KEY, test);
+ }
+
+ public static PRDeltaTestImpl putInitial() throws Exception {
+ PRDeltaTestImpl test = new PRDeltaTestImpl();
+ deltaPR.put(DELTA_KEY, test);
+
+ test.setIntVar(10);
+ deltaPR.put(DELTA_KEY, test);
+ test.setStr("DELTA");
+ deltaPR.put(DELTA_KEY, test);
+
+ return test;
}
+ public static void putMore(PRDeltaTestImpl val) throws Exception {
+ val.setIntVar(13);
+ put(val);
+
+ val.setStr("DELTA2");
+ put(val);
+ }
+
public static void put(PRDeltaTestImpl val) throws Exception {
deltaPR.put(DELTA_KEY, val);
}
@@ -1141,9 +1291,8 @@ public class PRDeltaPropagationDUnitTest extends DistributedTestCase {
static class PRDeltaTestImpl extends DeltaTestImpl {
int deltaSent = 0;
int deltaApplied = 0;
-
+
public PRDeltaTestImpl() {
-
}
public void toDelta(DataOutput out) throws IOException {