You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/01/07 23:00:30 UTC
[1/3] incubator-geode git commit: Remove Disabled from names of
tests. Ensure each test has a Category and add Ignore to any test that is
disabled due to being broken.
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-714 b417b275b -> 4f0776572
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java
deleted file mode 100755
index ffd5726..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.gemstone.gemfire.cache.EvictionAlgorithm;
-import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
-import com.gemstone.gemfire.internal.cache.lru.MemLRUCapacityController;
-
-public class EvictionDUnitDisabledTest extends EvictionTestBase {
- private static final long serialVersionUID = 270073077723092256L;
-
- public EvictionDUnitDisabledTest(String name) {
- super(name);
- }
-
- public void testDummyInlineNCentralizedEviction() {
- prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
- putData("PR1", 50, 1);
-
- final int expectedEviction1 = getExpectedEvictionRatioOnVm(dataStore1);
- final int expectedEviction2 = getExpectedEvictionRatioOnVm(dataStore2);
-
- raiseFakeNotification(dataStore1, "PR1", expectedEviction1);
- raiseFakeNotification(dataStore2, "PR1", expectedEviction2);
- validateNoOfEvictions("PR1", expectedEviction1 + expectedEviction2);
-
- putData("PR1", 4, 1);
- validateNoOfEvictions("PR1", 4 + expectedEviction1 + expectedEviction2);
- }
-
- public void testThreadPoolSize() {
- prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
- putData("PR1", 50, 1);
- raiseFakeNotification(dataStore1, "PR1", getExpectedEvictionRatioOnVm(dataStore1));
- verifyThreadPoolTaskCount(HeapEvictor.MAX_EVICTOR_THREADS);
- }
-
- public void testCentralizedEvictionnForDistributedRegionWithDummyEvent() {
- prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
- createDistributedRegion();
- putDataInDistributedRegion(50, 1);
- raiseFakeNotification(dataStore1, "DR1", getExpectedEvictionRatioOnVm(dataStore1));
- }
-
- /**
- * Test Case Description: 2 VM's. 2 PR's. 4 buckets each PR. PR1 has action
- * -Local destroy and PR2 has action - Overflow To Disk.
- *
- * Test Case verifies:If naturally Eviction up and eviction Down events are
- * raised. Centralized and Inline eviction are happening.All this verificatio
- * is done thorugh logs. It also verifies that during eviction, if one node
- * goes down and then comes up again causing GII to take place, the system
- * doesnot throw an OOME.
- */
- public void testEvictionWithNodeDown() {
- prepareScenario2(EvictionAlgorithm.LRU_HEAP, "PR3", "PR4");
- putDataInDataStore3("PR3", 100, 1);
- fakeNotification();
- print("PR3");
- killVm();
- bringVMBackToLife();
- assertEquals(100, getPRSize("PR3"));
- assertEquals(0, getPRSize("PR4"));
- }
-
- public void testEntryLruEvictions() {
- int extraEntries=1;
- createCache();
- maxEnteries=3;
- createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
-
- final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
- getLogWriter().info(
- "PR- " +pr.getEvictionAttributes().getMaximum());
-
- for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
- pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
- }
-
- assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
- }
-
-
- public void testEntryLru() {
- createCache();
- maxEnteries=12;
- createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
-
- final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
- getLogWriter().info(
- "PR- " +pr.getEvictionAttributes().getMaximum());
- for (int i = 0; i < 3; i++) {
- // assume mod-based hashing for bucket creation
- pr.put(new Integer(i), "value0");
- pr.put(new Integer(i
- + pr.getPartitionAttributes().getTotalNumBuckets()), "value1");
- pr.put(new Integer(i
- + (pr.getPartitionAttributes().getTotalNumBuckets()) * 2),
- "value2");
- }
- pr.put(new Integer(3), "value0");
-
- for (int i = 0; i < 2; i++) {
- pr.put(new Integer(i
- + pr.getPartitionAttributes().getTotalNumBuckets())*3, "value1");
- }
- assertEquals(0,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
- }
-
- public void testCheckEntryLruEvictionsIn1DataStore() {
- int extraEntries=10;
- createCache();
- maxEnteries=20;
- createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 5, 1, 1000,maxEnteries);
-
- final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
- getLogWriter().info(
- "PR- " +pr.getEvictionAttributes().getMaximum());
-
- for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
- pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
- }
-
- assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
-
- for (final Iterator i = pr.getDataStore().getAllLocalBuckets().iterator(); i
- .hasNext();) {
- final Map.Entry entry = (Map.Entry)i.next();
- final BucketRegion bucketRegion = (BucketRegion)entry.getValue();
- if (bucketRegion == null) {
- continue;
- }
- getLogWriter().info(
- "FINAL bucket= " + bucketRegion.getFullPath() + "size= "
- + bucketRegion.size() + " count= "+bucketRegion.entryCount());
- assertEquals(4,bucketRegion.size());
- }
- }
-
- public void testCheckEntryLruEvictionsIn2DataStore() {
- maxEnteries=20;
- prepareScenario1(EvictionAlgorithm.LRU_ENTRY,maxEnteries);
- putData("PR1", 60, 1);
- validateNoOfEvictions("PR1", 20);
- }
-
-
- public void testMemLruForPRAndDR() {
- createCache();
- createPartitionedRegion(true, EvictionAlgorithm.LRU_MEMORY, "PR1", 4, 1, 1000,40);
- createDistRegionWithMemEvictionAttr();
- PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
- DistributedRegion dr = (DistributedRegion)cache.getRegion("DR1");
-
- assertEquals(pr.getLocalMaxMemory(), pr.getEvictionAttributes().getMaximum());
- assertEquals(MemLRUCapacityController.DEFAULT_MAXIMUM_MEGABYTES, dr.getEvictionAttributes().getMaximum());
-
- for (int i = 0; i < 41; i++) {
- pr.put(new Integer(i), new byte[1 * 1024 * 1024]);
- }
-
- assertTrue(1<=((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
- assertTrue(((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions()<=2);
-
- for (int i = 0; i < 11; i++) {
- dr.put(new Integer(i), new byte[1 * 1024 * 1024]);
- }
-
- assertTrue(1<=((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions());
- assertTrue(((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions()<=2);
- }
-
- public void testEachTaskSize() {
- createCache();
- createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR1", 6, 1,
- 1000, 40);
- createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR2", 10, 1,
- 1000, 40);
- createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR3", 15, 1,
- 1000, 40);
- createDistRegion();
-
- ArrayList<Integer> taskSetSizes = getTestTaskSetSizes();
- if (taskSetSizes != null) {
- for (Integer size : taskSetSizes) {
- assertEquals(8, size.intValue());
- }
- }
-
- /*
- final PartitionedRegion pr1 = (PartitionedRegion)cache.getRegion("PR1");
- final PartitionedRegion pr2 = (PartitionedRegion)cache.getRegion("PR2");
- final PartitionedRegion pr3 = (PartitionedRegion)cache.getRegion("PR3");
- final DistributedRegion dr1 = (DistributedRegion)cache.getRegion("DR1");
-
- for (int counter = 1; counter <= 18; counter++) {
- pr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
- }
- getLogWriter().info("Size of PR1 before eviction = "+ pr1.size());
-
- for (int counter = 1; counter <= 30; counter++) {
- pr2.put(new Integer(counter), new byte[1 * 1024 * 1024]);
- }
- getLogWriter().info("Size of PR2 before eviction = "+ pr2.size());
-
- for (int counter = 1; counter <= 45; counter++) {
- pr3.put(new Integer(counter), new byte[1 * 1024 * 1024]);
- }
- getLogWriter().info("Size of PR3 before eviction = "+ pr3.size());
-
- for (int counter = 1; counter <= 150; counter++) {
- dr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
- }
- getLogWriter().info("Size of DR1 before eviction = "+ dr1.size());
-
-
- getLogWriter().info("Size of PR1 after eviction = "+ pr1.size());
- getLogWriter().info("Size of PR2 after eviction = "+ pr2.size());
- getLogWriter().info("Size of PR3 after eviction = "+ pr3.size());
- getLogWriter().info("Size of PR4 after eviction = "+ dr1.size());*/
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java
new file mode 100755
index 0000000..33807b7
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.EvictionAlgorithm;
+import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
+import com.gemstone.gemfire.internal.cache.lru.MemLRUCapacityController;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class EvictionDUnitTest extends EvictionTestBase {
+ private static final long serialVersionUID = 270073077723092256L;
+
+ public EvictionDUnitTest(String name) {
+ super(name);
+ }
+
+ public void testDummyInlineNCentralizedEviction() {
+ prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
+ putData("PR1", 50, 1);
+
+ final int expectedEviction1 = getExpectedEvictionRatioOnVm(dataStore1);
+ final int expectedEviction2 = getExpectedEvictionRatioOnVm(dataStore2);
+
+ raiseFakeNotification(dataStore1, "PR1", expectedEviction1);
+ raiseFakeNotification(dataStore2, "PR1", expectedEviction2);
+ validateNoOfEvictions("PR1", expectedEviction1 + expectedEviction2);
+
+ putData("PR1", 4, 1);
+ validateNoOfEvictions("PR1", 4 + expectedEviction1 + expectedEviction2);
+ }
+
+ public void testThreadPoolSize() {
+ prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
+ putData("PR1", 50, 1);
+ raiseFakeNotification(dataStore1, "PR1", getExpectedEvictionRatioOnVm(dataStore1));
+ verifyThreadPoolTaskCount(HeapEvictor.MAX_EVICTOR_THREADS);
+ }
+
+ public void testCentralizedEvictionnForDistributedRegionWithDummyEvent() {
+ prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
+ createDistributedRegion();
+ putDataInDistributedRegion(50, 1);
+ raiseFakeNotification(dataStore1, "DR1", getExpectedEvictionRatioOnVm(dataStore1));
+ }
+
+ /**
+ * Test Case Description: 2 VM's. 2 PR's. 4 buckets each PR. PR1 has action
+ * -Local destroy and PR2 has action - Overflow To Disk.
+ *
+ * Test Case verifies:If naturally Eviction up and eviction Down events are
+ * raised. Centralized and Inline eviction are happening.All this verificatio
+ * is done thorugh logs. It also verifies that during eviction, if one node
+ * goes down and then comes up again causing GII to take place, the system
+ * doesnot throw an OOME.
+ */
+ public void testEvictionWithNodeDown() {
+ prepareScenario2(EvictionAlgorithm.LRU_HEAP, "PR3", "PR4");
+ putDataInDataStore3("PR3", 100, 1);
+ fakeNotification();
+ print("PR3");
+ killVm();
+ bringVMBackToLife();
+ assertEquals(100, getPRSize("PR3"));
+ assertEquals(0, getPRSize("PR4"));
+ }
+
+ public void testEntryLruEvictions() {
+ int extraEntries=1;
+ createCache();
+ maxEnteries=3;
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
+
+ final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+ getLogWriter().info(
+ "PR- " +pr.getEvictionAttributes().getMaximum());
+
+ for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
+ pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+ }
+
+ assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+ }
+
+
+ public void testEntryLru() {
+ createCache();
+ maxEnteries=12;
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
+
+ final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+ getLogWriter().info(
+ "PR- " +pr.getEvictionAttributes().getMaximum());
+ for (int i = 0; i < 3; i++) {
+ // assume mod-based hashing for bucket creation
+ pr.put(new Integer(i), "value0");
+ pr.put(new Integer(i
+ + pr.getPartitionAttributes().getTotalNumBuckets()), "value1");
+ pr.put(new Integer(i
+ + (pr.getPartitionAttributes().getTotalNumBuckets()) * 2),
+ "value2");
+ }
+ pr.put(new Integer(3), "value0");
+
+ for (int i = 0; i < 2; i++) {
+ pr.put(new Integer(i
+ + pr.getPartitionAttributes().getTotalNumBuckets())*3, "value1");
+ }
+ assertEquals(0,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+ }
+
+ public void testCheckEntryLruEvictionsIn1DataStore() {
+ int extraEntries=10;
+ createCache();
+ maxEnteries=20;
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 5, 1, 1000,maxEnteries);
+
+ final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+ getLogWriter().info(
+ "PR- " +pr.getEvictionAttributes().getMaximum());
+
+ for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
+ pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+ }
+
+ assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+
+ for (final Iterator i = pr.getDataStore().getAllLocalBuckets().iterator(); i
+ .hasNext();) {
+ final Map.Entry entry = (Map.Entry)i.next();
+ final BucketRegion bucketRegion = (BucketRegion)entry.getValue();
+ if (bucketRegion == null) {
+ continue;
+ }
+ getLogWriter().info(
+ "FINAL bucket= " + bucketRegion.getFullPath() + "size= "
+ + bucketRegion.size() + " count= "+bucketRegion.entryCount());
+ assertEquals(4,bucketRegion.size());
+ }
+ }
+
+ public void testCheckEntryLruEvictionsIn2DataStore() {
+ maxEnteries=20;
+ prepareScenario1(EvictionAlgorithm.LRU_ENTRY,maxEnteries);
+ putData("PR1", 60, 1);
+ validateNoOfEvictions("PR1", 20);
+ }
+
+
+ public void testMemLruForPRAndDR() {
+ createCache();
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_MEMORY, "PR1", 4, 1, 1000,40);
+ createDistRegionWithMemEvictionAttr();
+ PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+ DistributedRegion dr = (DistributedRegion)cache.getRegion("DR1");
+
+ assertEquals(pr.getLocalMaxMemory(), pr.getEvictionAttributes().getMaximum());
+ assertEquals(MemLRUCapacityController.DEFAULT_MAXIMUM_MEGABYTES, dr.getEvictionAttributes().getMaximum());
+
+ for (int i = 0; i < 41; i++) {
+ pr.put(new Integer(i), new byte[1 * 1024 * 1024]);
+ }
+
+ assertTrue(1<=((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+ assertTrue(((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions()<=2);
+
+ for (int i = 0; i < 11; i++) {
+ dr.put(new Integer(i), new byte[1 * 1024 * 1024]);
+ }
+
+ assertTrue(1<=((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions());
+ assertTrue(((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions()<=2);
+ }
+
+ public void testEachTaskSize() {
+ createCache();
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR1", 6, 1,
+ 1000, 40);
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR2", 10, 1,
+ 1000, 40);
+ createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR3", 15, 1,
+ 1000, 40);
+ createDistRegion();
+
+ ArrayList<Integer> taskSetSizes = getTestTaskSetSizes();
+ if (taskSetSizes != null) {
+ for (Integer size : taskSetSizes) {
+ assertEquals(8, size.intValue());
+ }
+ }
+
+ /*
+ final PartitionedRegion pr1 = (PartitionedRegion)cache.getRegion("PR1");
+ final PartitionedRegion pr2 = (PartitionedRegion)cache.getRegion("PR2");
+ final PartitionedRegion pr3 = (PartitionedRegion)cache.getRegion("PR3");
+ final DistributedRegion dr1 = (DistributedRegion)cache.getRegion("DR1");
+
+ for (int counter = 1; counter <= 18; counter++) {
+ pr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+ }
+ getLogWriter().info("Size of PR1 before eviction = "+ pr1.size());
+
+ for (int counter = 1; counter <= 30; counter++) {
+ pr2.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+ }
+ getLogWriter().info("Size of PR2 before eviction = "+ pr2.size());
+
+ for (int counter = 1; counter <= 45; counter++) {
+ pr3.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+ }
+ getLogWriter().info("Size of PR3 before eviction = "+ pr3.size());
+
+ for (int counter = 1; counter <= 150; counter++) {
+ dr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+ }
+ getLogWriter().info("Size of DR1 before eviction = "+ dr1.size());
+
+
+ getLogWriter().info("Size of PR1 after eviction = "+ pr1.size());
+ getLogWriter().info("Size of PR2 after eviction = "+ pr2.size());
+ getLogWriter().info("Size of PR3 after eviction = "+ pr3.size());
+ getLogWriter().info("Size of PR4 after eviction = "+ dr1.size());*/
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
index 386f8ce..060aea7 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
@@ -34,7 +34,7 @@ import dunit.VM;
* Performs eviction dunit tests for off-heap memory.
* @author rholmes
*/
-public class OffHeapEvictionDUnitTest extends EvictionDUnitDisabledTest {
+public class OffHeapEvictionDUnitTest extends EvictionDUnitTest {
public OffHeapEvictionDUnitTest(String name) {
super(name);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java
deleted file mode 100644
index 6b957e8..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.ha;
-
-import java.util.Iterator;
-import java.util.Properties;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.internal.PoolImpl;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache30.ClientServerTestCase;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.internal.AvailablePort;
-
-import dunit.DistributedTestCase;
-import dunit.Host;
-import dunit.VM;
-
-/**
- * This is Dunit test for bug 36109. This test has a cache-client having a primary
- * and a secondary cache-server as its endpoint. Primary does some operations
- * and is stopped, the client fails over to secondary and does some operations
- * and it is verified that the 'invalidates' stats at the client is same as the
- * total number of operations done by both primary and secondary. The bug was
- * appearing because invalidate stats was part of Endpoint which used to get
- * closed during fail over , with the failed endpoint getting closed. This bug
- * has been fixed by moving the invalidate stat to be part of our implementation.
- *
- * @author Dinesh Patel
- *
- */
-public class StatsBugDUnitDisabledTest extends DistributedTestCase
-{
- /** primary cache server */
- VM primary = null;
-
- /** secondary cache server */
- VM secondary = null;
-
- /** the cache client */
- VM client1 = null;
-
- /** the cache */
- private static Cache cache = null;
-
- /** port for the primary cache server */
- private static int PORT1;
-
- /** port for the secondary cache server */
- private static int PORT2;
-
- /** name of the test region */
- private static final String REGION_NAME = "StatsBugDUnitTest_Region";
-
- /** brige-writer instance( used to get connection proxy handle) */
- private static PoolImpl pool = null;
-
- /** total number of cache servers */
- private static final int TOTAL_SERVERS = 2;
-
- /** number of puts done by each server */
- private static final int PUTS_PER_SERVER = 10;
-
- /** prefix added to the keys of events generated on primary */
- private static final String primaryPrefix = "primary_";
-
- /** prefix added to the keys of events generated on secondary */
- private static final String secondaryPrefix = "secondary_";
-
- /**
- * Constructor
- *
- * @param name -
- * name for this test instance
- */
- public StatsBugDUnitDisabledTest(String name) {
- super(name);
- }
-
- /**
- * Creates the primary and the secondary cache servers
- *
- * @throws Exception -
- * thrown if any problem occurs in initializing the test
- */
- public void setUp() throws Exception
- {
- disconnectAllFromDS();
- super.setUp();
- final Host host = Host.getHost(0);
- primary = host.getVM(0);
- secondary = host.getVM(1);
- client1 = host.getVM(2);
- PORT1 = ((Integer)primary.invoke(StatsBugDUnitDisabledTest.class,
- "createServerCache")).intValue();
- PORT2 = ((Integer)secondary.invoke(StatsBugDUnitDisabledTest.class,
- "createServerCache")).intValue();
- }
-
- /**
- * Create the cache
- *
- * @param props -
- * properties for DS
- * @return the cache instance
- * @throws Exception -
- * thrown if any problem occurs in cache creation
- */
- private Cache createCache(Properties props) throws Exception
- {
- DistributedSystem ds = getSystem(props);
- ds.disconnect();
- ds = getSystem(props);
- Cache cache = null;
- cache = CacheFactory.create(ds);
- if (cache == null) {
- throw new Exception("CacheFactory.create() returned null ");
- }
- return cache;
- }
-
- /**
- * close the cache instances in server and client during tearDown
- *
- * @throws Exception
- * thrown if any problem occurs in closing cache
- */
- public void tearDown2() throws Exception
- {
- super.tearDown2();
- // close client
- client1.invoke(StatsBugDUnitDisabledTest.class, "closeCache");
-
- // close server
- primary.invoke(StatsBugDUnitDisabledTest.class, "closeCache");
- secondary.invoke(StatsBugDUnitDisabledTest.class, "closeCache");
- }
-
- /**
- * This test does the following:<br>
- * 1)Create and populate the client<br>
- * 2)Do some operations from the primary cache-server<br>
- * 3)Stop the primary cache-server<br>
- * 4)Wait some time to allow client to failover to secondary and do some
- * operations from secondary<br>
- * 5)Verify that the invalidates stats at the client accounts for the
- * operations done by both, primary and secondary.
- *
- * @throws Exception -
- * thrown if any problem occurs in test execution
- */
- public void testBug36109() throws Exception
- {
- getLogWriter().info("testBug36109 : BEGIN");
- client1.invoke(StatsBugDUnitDisabledTest.class, "createClientCacheForInvalidates", new Object[] {
- getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2) });
- client1.invoke(StatsBugDUnitDisabledTest.class, "prepopulateClient");
- primary.invoke(StatsBugDUnitDisabledTest.class, "doEntryOperations",
- new Object[] { primaryPrefix });
- pause(3000);
- primary.invoke(StatsBugDUnitDisabledTest.class, "stopServer");
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException ignore) {
- fail("interrupted");
- }
-
- secondary.invoke(StatsBugDUnitDisabledTest.class, "doEntryOperations",
- new Object[] { secondaryPrefix });
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException ignore) {
- fail("interrupted");
- }
-
- client1.invoke(StatsBugDUnitDisabledTest.class, "verifyNumInvalidates");
- getLogWriter().info("testBug36109 : END");
- }
-
- /**
- * Creates and starts the cache-server
- *
- * @return - the port on which cache-server is running
- * @throws Exception -
- * thrown if any problem occurs in cache/server creation
- */
- public static Integer createServerCache() throws Exception
- {
- StatsBugDUnitDisabledTest test = new StatsBugDUnitDisabledTest("temp");
- Properties props = new Properties();
- cache = test.createCache(props);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setDataPolicy(DataPolicy.REPLICATE);
-
- RegionAttributes attrs = factory.create();
-
- cache.createRegion(REGION_NAME, attrs);
- CacheServer server = cache.addCacheServer();
- int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- server.setPort(port);
- server.setNotifyBySubscription(false);
- server.setSocketBufferSize(32768);
- server.start();
- getLogWriter().info("Server started at PORT = " + port);
- return new Integer(port);
- }
-
- /**
- * Initializes the cache client
- *
- * @param port1 -
- * port for the primary cache-server
- * @param port2-port
- * for the secondary cache-server
- * @throws Exception-thrown
- * if any problem occurs in initializing the client
- */
- public static void createClientCache(String host, Integer port1, Integer port2)
- throws Exception
- {
- StatsBugDUnitDisabledTest test = new StatsBugDUnitDisabledTest("temp");
- cache = test.createCache(createProperties1());
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(REGION_NAME, attrs);
- region.registerInterest("ALL_KEYS");
- getLogWriter().info("Client cache created");
- }
-
- /**
- * Initializes the cache client
- *
- * @param port1 -
- * port for the primary cache-server
- * @param port2-port
- * for the secondary cache-server
- * @throws Exception-thrown
- * if any problem occurs in initializing the client
- */
- public static void createClientCacheForInvalidates(String host, Integer port1, Integer port2)
- throws Exception
- {
- StatsBugDUnitDisabledTest test = new StatsBugDUnitDisabledTest("temp");
- cache = test.createCache(createProperties1());
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(REGION_NAME, attrs);
- region.registerInterest("ALL_KEYS", false, false);
- getLogWriter().info("Client cache created");
- }
-
- /**
- * Verify that the invalidates stats at the client accounts for the operations
- * done by both, primary and secondary.
- *
- */
- public static void verifyNumInvalidates()
- {
- long invalidatesRecordedByStats = pool.getInvalidateCount();
- getLogWriter().info(
- "invalidatesRecordedByStats = " + invalidatesRecordedByStats);
-
- int expectedInvalidates = TOTAL_SERVERS * PUTS_PER_SERVER;
- getLogWriter().info("expectedInvalidates = " + expectedInvalidates);
-
- if (invalidatesRecordedByStats != expectedInvalidates) {
- fail("Invalidates received by client(" + invalidatesRecordedByStats
- + ") does not match with the number of operations("
- + expectedInvalidates + ") done at server");
- }
- }
-
- /**
- * Stops the cache server
- *
- */
- public static void stopServer()
- {
- try {
- Iterator iter = cache.getCacheServers().iterator();
- if (iter.hasNext()) {
- CacheServer server = (CacheServer)iter.next();
- server.stop();
- }
- }
- catch (Exception e) {
- fail("failed while stopServer()" + e);
- }
- }
-
- /**
- * create properties for a loner VM
- */
- private static Properties createProperties1()
- {
- Properties props = new Properties();
- props.setProperty("mcast-port", "0");
- props.setProperty("locators", "");
- return props;
- }
-
-
- /**
- * Do PUT operations
- *
- * @param keyPrefix -
- * string prefix for the keys for all the entries do be done
- * @throws Exception -
- * thrown if any exception occurs in doing PUTs
- */
- public static void doEntryOperations(String keyPrefix) throws Exception
- {
- Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
- for (int i = 0; i < PUTS_PER_SERVER; i++) {
- r1.put(keyPrefix + i, keyPrefix + "val-" + i);
- }
- }
-
- /**
- * Prepopulate the client with the entries that will be done by cache-servers
- *
- * @throws Exception
- */
- public static void prepopulateClient() throws Exception
- {
- doEntryOperations(primaryPrefix);
- doEntryOperations(secondaryPrefix);
- }
-
- /**
- * Close the cache
- *
- */
- public static void closeCache()
- {
- if (cache != null && !cache.isClosed()) {
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java
new file mode 100644
index 0000000..3d09f80
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.ha;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * This is Dunit test for bug 36109. This test has a cache-client having a primary
+ * and a secondary cache-server as its endpoint. Primary does some operations
+ * and is stopped, the client fails over to secondary and does some operations
+ * and it is verified that the 'invalidates' stats at the client is same as the
+ * total number of operations done by both primary and secondary. The bug was
+ * appearing because invalidate stats was part of Endpoint which used to get
+ * closed during fail over , with the failed endpoint getting closed. This bug
+ * has been fixed by moving the invalidate stat to be part of our implementation.
+ *
+ * @author Dinesh Patel
+ *
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class StatsBugDUnitTest extends DistributedTestCase
+{
+ /** primary cache server */
+ VM primary = null;
+
+ /** secondary cache server */
+ VM secondary = null;
+
+ /** the cache client */
+ VM client1 = null;
+
+ /** the cache */
+ private static Cache cache = null;
+
+ /** port for the primary cache server */
+ private static int PORT1;
+
+ /** port for the secondary cache server */
+ private static int PORT2;
+
+ /** name of the test region */
+ private static final String REGION_NAME = "StatsBugDUnitTest_Region";
+
+ /** brige-writer instance( used to get connection proxy handle) */
+ private static PoolImpl pool = null;
+
+ /** total number of cache servers */
+ private static final int TOTAL_SERVERS = 2;
+
+ /** number of puts done by each server */
+ private static final int PUTS_PER_SERVER = 10;
+
+ /** prefix added to the keys of events generated on primary */
+ private static final String primaryPrefix = "primary_";
+
+ /** prefix added to the keys of events generated on secondary */
+ private static final String secondaryPrefix = "secondary_";
+
+ /**
+ * Constructor
+ *
+ * @param name -
+ * name for this test instance
+ */
+ public StatsBugDUnitTest(String name) {
+ super(name);
+ }
+
+ /**
+ * Creates the primary and the secondary cache servers
+ *
+ * @throws Exception -
+ * thrown if any problem occurs in initializing the test
+ */
+ public void setUp() throws Exception
+ {
+ disconnectAllFromDS();
+ super.setUp();
+ final Host host = Host.getHost(0);
+ primary = host.getVM(0);
+ secondary = host.getVM(1);
+ client1 = host.getVM(2);
+ PORT1 = ((Integer)primary.invoke(StatsBugDUnitTest.class,
+ "createServerCache")).intValue();
+ PORT2 = ((Integer)secondary.invoke(StatsBugDUnitTest.class,
+ "createServerCache")).intValue();
+ }
+
+ /**
+ * Create the cache
+ *
+ * @param props -
+ * properties for DS
+ * @return the cache instance
+ * @throws Exception -
+ * thrown if any problem occurs in cache creation
+ */
+ private Cache createCache(Properties props) throws Exception
+ {
+ DistributedSystem ds = getSystem(props);
+ ds.disconnect();
+ ds = getSystem(props);
+ Cache cache = null;
+ cache = CacheFactory.create(ds);
+ if (cache == null) {
+ throw new Exception("CacheFactory.create() returned null ");
+ }
+ return cache;
+ }
+
+ /**
+ * close the cache instances in server and client during tearDown
+ *
+ * @throws Exception
+ * thrown if any problem occurs in closing cache
+ */
+ public void tearDown2() throws Exception
+ {
+ super.tearDown2();
+ // close client
+ client1.invoke(StatsBugDUnitTest.class, "closeCache");
+
+ // close server
+ primary.invoke(StatsBugDUnitTest.class, "closeCache");
+ secondary.invoke(StatsBugDUnitTest.class, "closeCache");
+ }
+
+ /**
+ * This test does the following:<br>
+ * 1)Create and populate the client<br>
+ * 2)Do some operations from the primary cache-server<br>
+ * 3)Stop the primary cache-server<br>
+ * 4)Wait some time to allow client to failover to secondary and do some
+ * operations from secondary<br>
+ * 5)Verify that the invalidates stats at the client accounts for the
+ * operations done by both, primary and secondary.
+ *
+ * @throws Exception -
+ * thrown if any problem occurs in test execution
+ */
+ public void testBug36109() throws Exception
+ {
+ getLogWriter().info("testBug36109 : BEGIN");
+ client1.invoke(StatsBugDUnitTest.class, "createClientCacheForInvalidates", new Object[] {
+ getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2) });
+ client1.invoke(StatsBugDUnitTest.class, "prepopulateClient");
+ primary.invoke(StatsBugDUnitTest.class, "doEntryOperations",
+ new Object[] { primaryPrefix });
+ pause(3000);
+ primary.invoke(StatsBugDUnitTest.class, "stopServer");
+ try {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException ignore) {
+ fail("interrupted");
+ }
+
+ secondary.invoke(StatsBugDUnitTest.class, "doEntryOperations",
+ new Object[] { secondaryPrefix });
+ try {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException ignore) {
+ fail("interrupted");
+ }
+
+ client1.invoke(StatsBugDUnitTest.class, "verifyNumInvalidates");
+ getLogWriter().info("testBug36109 : END");
+ }
+
+ /**
+ * Creates and starts the cache-server
+ *
+ * @return - the port on which cache-server is running
+ * @throws Exception -
+ * thrown if any problem occurs in cache/server creation
+ */
+ public static Integer createServerCache() throws Exception
+ {
+ StatsBugDUnitTest test = new StatsBugDUnitTest("temp");
+ Properties props = new Properties();
+ cache = test.createCache(props);
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+
+ RegionAttributes attrs = factory.create();
+
+ cache.createRegion(REGION_NAME, attrs);
+ CacheServer server = cache.addCacheServer();
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ server.setPort(port);
+ server.setNotifyBySubscription(false);
+ server.setSocketBufferSize(32768);
+ server.start();
+ getLogWriter().info("Server started at PORT = " + port);
+ return new Integer(port);
+ }
+
+ /**
+ * Initializes the cache client
+ *
+ * @param port1 -
+ * port for the primary cache-server
+ * @param port2-port
+ * for the secondary cache-server
+ * @throws Exception-thrown
+ * if any problem occurs in initializing the client
+ */
+ public static void createClientCache(String host, Integer port1, Integer port2)
+ throws Exception
+ {
+ StatsBugDUnitTest test = new StatsBugDUnitTest("temp");
+ cache = test.createCache(createProperties1());
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
+ RegionAttributes attrs = factory.create();
+ Region region = cache.createRegion(REGION_NAME, attrs);
+ region.registerInterest("ALL_KEYS");
+ getLogWriter().info("Client cache created");
+ }
+
+ /**
+ * Initializes the cache client
+ *
+ * @param port1 -
+ * port for the primary cache-server
+ * @param port2-port
+ * for the secondary cache-server
+ * @throws Exception-thrown
+ * if any problem occurs in initializing the client
+ */
+ public static void createClientCacheForInvalidates(String host, Integer port1, Integer port2)
+ throws Exception
+ {
+ StatsBugDUnitTest test = new StatsBugDUnitTest("temp");
+ cache = test.createCache(createProperties1());
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
+ RegionAttributes attrs = factory.create();
+ Region region = cache.createRegion(REGION_NAME, attrs);
+ region.registerInterest("ALL_KEYS", false, false);
+ getLogWriter().info("Client cache created");
+ }
+
+ /**
+ * Verify that the invalidates stats at the client accounts for the operations
+ * done by both, primary and secondary.
+ *
+ */
+ public static void verifyNumInvalidates()
+ {
+ long invalidatesRecordedByStats = pool.getInvalidateCount();
+ getLogWriter().info(
+ "invalidatesRecordedByStats = " + invalidatesRecordedByStats);
+
+ int expectedInvalidates = TOTAL_SERVERS * PUTS_PER_SERVER;
+ getLogWriter().info("expectedInvalidates = " + expectedInvalidates);
+
+ if (invalidatesRecordedByStats != expectedInvalidates) {
+ fail("Invalidates received by client(" + invalidatesRecordedByStats
+ + ") does not match with the number of operations("
+ + expectedInvalidates + ") done at server");
+ }
+ }
+
+ /**
+ * Stops the cache server
+ *
+ */
+ public static void stopServer()
+ {
+ try {
+ Iterator iter = cache.getCacheServers().iterator();
+ if (iter.hasNext()) {
+ CacheServer server = (CacheServer)iter.next();
+ server.stop();
+ }
+ }
+ catch (Exception e) {
+ fail("failed while stopServer()" + e);
+ }
+ }
+
+ /**
+ * create properties for a loner VM
+ */
+ private static Properties createProperties1()
+ {
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", "");
+ return props;
+ }
+
+
+ /**
+ * Do PUT operations
+ *
+ * @param keyPrefix -
+ * string prefix for the keys for all the entries do be done
+ * @throws Exception -
+ * thrown if any exception occurs in doing PUTs
+ */
+ public static void doEntryOperations(String keyPrefix) throws Exception
+ {
+ Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+ for (int i = 0; i < PUTS_PER_SERVER; i++) {
+ r1.put(keyPrefix + i, keyPrefix + "val-" + i);
+ }
+ }
+
+ /**
+ * Prepopulate the client with the entries that will be done by cache-servers
+ *
+ * @throws Exception
+ */
+ public static void prepopulateClient() throws Exception
+ {
+ doEntryOperations(primaryPrefix);
+ doEntryOperations(secondaryPrefix);
+ }
+
+ /**
+ * Close the cache
+ *
+ */
+ public static void closeCache()
+ {
+ if (cache != null && !cache.isClosed()) {
+ cache.close();
+ cache.getDistributedSystem().disconnect();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
----------------------------------------------------------------------
diff --git a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
new file mode 100755
index 0000000..8eec738
--- /dev/null
+++ b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.test.junit.categories;
+/**
+ * JUnit Test Category that specifies a test executes within a container
+ * environment such as an OSGi server.
+ *
+ * @author Kirk Lund
+ */
+public interface ContainerTest {
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
----------------------------------------------------------------------
diff --git a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
new file mode 100755
index 0000000..4fe535b
--- /dev/null
+++ b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.test.junit.categories;
+/**
+ * JUnit Test Category that specifies a hydra test.
+ *
+ * @author Kirk Lund
+ */
+public interface HydraTest {
+}
[3/3] incubator-geode git commit: Remove Disabled from names of
tests. Ensure each test has a Category and add Ignore to any test that is
disabled due to being broken.
Posted by kl...@apache.org.
Remove Disabled from names of tests. Ensure each test has a Category and add Ignore to any test that is disabled due to being broken.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4f077657
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4f077657
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4f077657
Branch: refs/heads/feature/GEODE-714
Commit: 4f0776572e6e4d5b61ba609627401d392c537224
Parents: b417b27
Author: Kirk Lund <kl...@pivotal.io>
Authored: Thu Jan 7 13:56:09 2016 -0800
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Thu Jan 7 13:56:09 2016 -0800
----------------------------------------------------------------------
build.gradle | 14 +-
.../PutAllWithIndexPerfDUnitDisabledTest.java | 215 ---
.../index/PutAllWithIndexPerfDUnitTest.java | 221 +++
.../cache30/SlowRecDUnitDisabledTest.java | 1446 -----------------
.../gemfire/cache30/SlowRecDUnitTest.java | 1453 ++++++++++++++++++
.../distributed/DistributedMemberDUnitTest.java | 4 +
.../gemfire/distributed/KirkDUnitTest.java | 17 +
.../distributed/KirkDistributedTestSuite.java | 31 +
.../distributed/KirkIntegrationTestSuite.java | 30 +
...cpServerBackwardCompatDUnitDisabledTest.java | 250 ---
.../TcpServerBackwardCompatDUnitTest.java | 256 +++
.../cache/EvictionDUnitDisabledTest.java | 240 ---
.../internal/cache/EvictionDUnitTest.java | 246 +++
.../cache/OffHeapEvictionDUnitTest.java | 2 +-
.../cache/ha/StatsBugDUnitDisabledTest.java | 368 -----
.../internal/cache/ha/StatsBugDUnitTest.java | 374 +++++
.../test/junit/categories/ContainerTest.java | 25 +
.../test/junit/categories/HydraTest.java | 24 +
18 files changed, 2695 insertions(+), 2521 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4563590..3fff7f9 100755
--- a/build.gradle
+++ b/build.gradle
@@ -350,6 +350,9 @@ subprojects {
includeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
}
// run each test in its own vm to avoid interference issues if a test doesn't clean up
@@ -370,6 +373,8 @@ subprojects {
excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
}
beforeTest { descriptor ->
@@ -384,6 +389,9 @@ subprojects {
excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
includeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
}
forkEvery 1
@@ -403,10 +411,14 @@ subprojects {
excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
includeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+ excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
}
//I'm hoping this might deal with SOME OOMEs I've seen
- forkEvery 30
+ //forkEvery 30
+ forkEvery 1
}
// By proving a file with an arbitrary list of test classes, we can select only those
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java
deleted file mode 100644
index 22ce44b..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package com.gemstone.gemfire.cache.query.internal.index;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.ClientCache;
-import com.gemstone.gemfire.cache.client.ClientCacheFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
-import com.gemstone.gemfire.cache.query.data.PortfolioPdx;
-import com.gemstone.gemfire.cache.query.dunit.RemoteQueryDUnitTest;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.cache30.CacheTestCase;
-import com.gemstone.gemfire.internal.AvailablePort;
-
-import dunit.Host;
-import dunit.VM;
-
-/**
- * @author shobhit
- *
- */
-public class PutAllWithIndexPerfDUnitDisabledTest extends CacheTestCase {
-
- /** The port on which the bridge server was started in this VM */
- private static int bridgeServerPort;
- static long timeWithoutStructTypeIndex = 0;
- static long timeWithStructTypeIndex = 0;
-
- public PutAllWithIndexPerfDUnitDisabledTest(String name) {
- super(name);
- }
-
- public void setUp() throws Exception {
- super.setUp();
- disconnectAllFromDS();
- }
-
- public void tearDown2() throws Exception {
- try {
- super.tearDown2();
- }
- finally {
- disconnectAllFromDS();
- }
- }
-
- public void testPutAllWithIndexes() {
- final String name = "testRegion";
- final Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- final int numberOfEntries = 10000;
-
- // Start server
- vm0.invoke(new CacheSerializableRunnable("Create Bridge Server") {
- public void run2() throws CacheException {
- Properties config = new Properties();
- config.put("locators", "localhost["+getDUnitLocatorPort()+"]");
- Cache cache = new CacheFactory(config).create();
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- cache.createRegionFactory(factory.create()).create(name);
- try {
- startBridgeServer(0, false);
- } catch (Exception ex) {
- fail("While starting CacheServer", ex);
- }
- //Create Index on empty region
- try {
- cache.getQueryService().createIndex("idIndex", "ID", "/"+name);
- } catch (Exception e) {
- fail("index creation failed", e);
- }
- }
- });
-
- // Create client region
- final int port = vm0.invokeInt(PutAllWithIndexPerfDUnitDisabledTest.class, "getCacheServerPort");
- final String host0 = getServerHostName(vm0.getHost());
- vm1.invoke(new CacheSerializableRunnable("Create region") {
- public void run2() throws CacheException {
- Properties config = new Properties();
- config.setProperty("mcast-port", "0");
- ClientCache cache = new ClientCacheFactory().addPoolServer(host0, port).create();
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.LOCAL);
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
- }
- });
-
- vm1.invoke(new CacheSerializableRunnable("putAll() test") {
-
- @Override
- public void run2() throws CacheException {
- Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
-
- Map warmupMap = new HashMap();
- Map data = new HashMap();
- for(int i=0; i<10000; i++){
- Object p = new PortfolioPdx(i);
- if (i < 1000) warmupMap.put(i, p);
- data.put(i, p);
- }
-
- for (int i=0; i<10; i++) {
- exampleRegion.putAll(warmupMap);
- }
-
- long start = System.currentTimeMillis();
- for (int i=0; i<10; i++) {
- exampleRegion.putAll(data);
- }
- long end = System.currentTimeMillis();
- timeWithoutStructTypeIndex = ((end-start)/10);
- System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
-
- }
- });
-
- vm0.invoke(new CacheSerializableRunnable("Remove Index and create new one") {
-
- @Override
- public void run2() throws CacheException {
- try {
- Cache cache = CacheFactory.getAnyInstance();
- cache.getQueryService().removeIndexes();
- cache.getRegion(name).clear();
- cache.getQueryService().createIndex("idIndex", "p.ID", "/"+name+" p");
- } catch (Exception e) {
- fail("index creation failed", e);
- }
- }
- });
-
- vm1.invoke(new CacheSerializableRunnable("putAll() test") {
-
- @Override
- public void run2() throws CacheException {
- Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
- exampleRegion.clear();
- Map warmupMap = new HashMap();
- Map data = new HashMap();
- for(int i=0; i<10000; i++){
- Object p = new PortfolioPdx(i);
- if (i < 1000) warmupMap.put(i, p);
- data.put(i, p);
- }
-
- for (int i=0; i<10; i++) {
- exampleRegion.putAll(warmupMap);
- }
-
- long start = System.currentTimeMillis();
- for (int i=0; i<10; i++) {
- exampleRegion.putAll(data);
- }
- long end = System.currentTimeMillis();
- timeWithStructTypeIndex = ((end-start)/10);
- System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
-
- }
- });
-
- if (timeWithoutStructTypeIndex > timeWithStructTypeIndex) {
- fail("putAll took more time without struct type index than simple index");
- }
- }
-
- /**
- * Starts a bridge server on the given port, using the given
- * deserializeValues and notifyBySubscription to serve up the
- * given region.
- */
- protected void startBridgeServer(int port, boolean notifyBySubscription)
- throws IOException {
-
- Cache cache = CacheFactory.getAnyInstance();
- CacheServer bridge = cache.addCacheServer();
- bridge.setPort(port);
- bridge.start();
- bridgeServerPort = bridge.getPort();
- }
-
- private static int getCacheServerPort() {
- return bridgeServerPort;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java
new file mode 100644
index 0000000..5eb65c3
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.cache.query.internal.index;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.query.data.PortfolioPdx;
+import com.gemstone.gemfire.cache.query.dunit.RemoteQueryDUnitTest;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * @author shobhit
+ *
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class PutAllWithIndexPerfDUnitTest extends CacheTestCase {
+
+ /** The port on which the bridge server was started in this VM */
+ private static int bridgeServerPort;
+ static long timeWithoutStructTypeIndex = 0;
+ static long timeWithStructTypeIndex = 0;
+
+ public PutAllWithIndexPerfDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ disconnectAllFromDS();
+ }
+
+ public void tearDown2() throws Exception {
+ try {
+ super.tearDown2();
+ }
+ finally {
+ disconnectAllFromDS();
+ }
+ }
+
+ public void testPutAllWithIndexes() {
+ final String name = "testRegion";
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ final int numberOfEntries = 10000;
+
+ // Start server
+ vm0.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ Properties config = new Properties();
+ config.put("locators", "localhost["+getDUnitLocatorPort()+"]");
+ Cache cache = new CacheFactory(config).create();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ cache.createRegionFactory(factory.create()).create(name);
+ try {
+ startBridgeServer(0, false);
+ } catch (Exception ex) {
+ fail("While starting CacheServer", ex);
+ }
+ //Create Index on empty region
+ try {
+ cache.getQueryService().createIndex("idIndex", "ID", "/"+name);
+ } catch (Exception e) {
+ fail("index creation failed", e);
+ }
+ }
+ });
+
+ // Create client region
+ final int port = vm0.invokeInt(PutAllWithIndexPerfDUnitTest.class, "getCacheServerPort");
+ final String host0 = getServerHostName(vm0.getHost());
+ vm1.invoke(new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ Properties config = new Properties();
+ config.setProperty("mcast-port", "0");
+ ClientCache cache = new ClientCacheFactory().addPoolServer(host0, port).create();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("putAll() test") {
+
+ @Override
+ public void run2() throws CacheException {
+ Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
+
+ Map warmupMap = new HashMap();
+ Map data = new HashMap();
+ for(int i=0; i<10000; i++){
+ Object p = new PortfolioPdx(i);
+ if (i < 1000) warmupMap.put(i, p);
+ data.put(i, p);
+ }
+
+ for (int i=0; i<10; i++) {
+ exampleRegion.putAll(warmupMap);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i=0; i<10; i++) {
+ exampleRegion.putAll(data);
+ }
+ long end = System.currentTimeMillis();
+ timeWithoutStructTypeIndex = ((end-start)/10);
+ System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
+
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Remove Index and create new one") {
+
+ @Override
+ public void run2() throws CacheException {
+ try {
+ Cache cache = CacheFactory.getAnyInstance();
+ cache.getQueryService().removeIndexes();
+ cache.getRegion(name).clear();
+ cache.getQueryService().createIndex("idIndex", "p.ID", "/"+name+" p");
+ } catch (Exception e) {
+ fail("index creation failed", e);
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("putAll() test") {
+
+ @Override
+ public void run2() throws CacheException {
+ Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
+ exampleRegion.clear();
+ Map warmupMap = new HashMap();
+ Map data = new HashMap();
+ for(int i=0; i<10000; i++){
+ Object p = new PortfolioPdx(i);
+ if (i < 1000) warmupMap.put(i, p);
+ data.put(i, p);
+ }
+
+ for (int i=0; i<10; i++) {
+ exampleRegion.putAll(warmupMap);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i=0; i<10; i++) {
+ exampleRegion.putAll(data);
+ }
+ long end = System.currentTimeMillis();
+ timeWithStructTypeIndex = ((end-start)/10);
+ System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
+
+ }
+ });
+
+ if (timeWithoutStructTypeIndex > timeWithStructTypeIndex) {
+ fail("putAll took more time without struct type index than simple index");
+ }
+ }
+
+ /**
+ * Starts a bridge server on the given port, using the given
+ * deserializeValues and notifyBySubscription to serve up the
+ * given region.
+ */
+ protected void startBridgeServer(int port, boolean notifyBySubscription)
+ throws IOException {
+
+ Cache cache = CacheFactory.getAnyInstance();
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(port);
+ bridge.start();
+ bridgeServerPort = bridge.getPort();
+ }
+
+ private static int getCacheServerPort() {
+ return bridgeServerPort;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
deleted file mode 100644
index e669f04..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
+++ /dev/null
@@ -1,1446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache30;
-
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.cache.*;
-import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.util.*;
-import com.gemstone.gemfire.distributed.internal.*;
-import com.gemstone.gemfire.internal.tcp.Connection;
-import dunit.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Test to make sure slow receiver queuing is working
- *
- * @author darrel
- * @since 4.2.1
- */
-public class SlowRecDUnitDisabledTest extends CacheTestCase {
-
- public SlowRecDUnitDisabledTest(String name) {
- super(name);
- }
-
- // this test has special config of its distributed system so
- // the setUp and tearDown methods need to make sure we don't
- // use the ds from previous test and that we don't leave ours around
- // for the next test to use.
-
- public void setUp() throws Exception {
- try {
- disconnectAllFromDS();
- } finally {
- super.setUp();
- }
- }
- public void tearDown2() throws Exception {
- try {
- super.tearDown2();
- } finally {
- disconnectAllFromDS();
- }
- }
-
- ////////////////////// Test Methods //////////////////////
-
- private VM getOtherVm() {
- Host host = Host.getHost(0);
- return host.getVM(0);
- }
-
- static protected Object lastCallback = null;
-
- private void doCreateOtherVm(final Properties p, final boolean addListener) {
- VM vm = getOtherVm();
- vm.invoke(new CacheSerializableRunnable("create root") {
- public void run2() throws CacheException {
- getSystem(p);
- createAckRegion(true, false);
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
- if (addListener) {
- CacheListener cl = new CacheListenerAdapter() {
- public void afterUpdate(EntryEvent event) {
- // make the slow receiver event slower!
- try {Thread.sleep(500);} catch (InterruptedException shuttingDown) {fail("interrupted");}
- }
- };
- af.setCacheListener(cl);
- } else {
- CacheListener cl = new CacheListenerAdapter() {
- public void afterCreate(EntryEvent event) {
-// getLogWriter().info("afterCreate " + event.getKey());
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- if (event.getKey().equals("sleepkey")) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
-// getLogWriter().info("sleepkey sleeping for " + sleepMs);
- try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
- }
- }
- public void afterUpdate(EntryEvent event) {
-// getLogWriter().info("afterUpdate " + event.getKey());
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- if (event.getKey().equals("sleepkey")) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
-// getLogWriter().info("sleepkey sleeping for " + sleepMs);
- try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
- }
- }
- public void afterInvalidate(EntryEvent event) {
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- }
- public void afterDestroy(EntryEvent event) {
- if (event.getCallbackArgument() != null) {
- lastCallback = event.getCallbackArgument();
- }
- }
- };
- af.setCacheListener(cl);
- }
- Region r1 = createRootRegion("slowrec", af.create());
- // place holder so we receive updates
- r1.create("key", "value");
- }
- });
- }
- static protected final String CHECK_INVALID = "CHECK_INVALID";
-
- private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
- VM vm = getOtherVm();
- vm.invoke(new CacheSerializableRunnable("check last value") {
- public void run2() throws CacheException {
- Region r1 = getRootRegion("slowrec");
- if (lcb != null) {
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return lcb.equals(lastCallback);
- }
- public String description() {
- return "waiting for callback";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
- assertEquals(lcb, lastCallback);
- }
- if (lastValue == null) {
- final Region r = r1;
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return r.getEntry("key") == null;
- }
- public String description() {
- return "waiting for key to become null";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
- assertEquals(null, r1.getEntry("key"));
- } else if (CHECK_INVALID.equals(lastValue)) {
- // should be invalid
- {
- final Region r = r1;
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- Entry e = r.getEntry("key");
- if (e == null) {
- return false;
- }
- return e.getValue() == null;
- }
- public String description() {
- return "waiting for invalidate";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
-// assertNotNull(re);
-// assertEquals(null, value);
- }
- } else {
- {
- int retryCount = 1000;
- Region.Entry re = null;
- Object value = null;
- while (retryCount-- > 0) {
- re = r1.getEntry("key");
- if (re != null) {
- value = re.getValue();
- if (value != null && value.equals(lastValue)) {
- break;
- }
- }
- try {Thread.sleep(50);} catch (InterruptedException ignore) {fail("interrupted");}
- }
- assertNotNull(re);
- assertNotNull(value);
- assertEquals(lastValue, value);
- }
- }
- }
- });
- }
-
- private void forceQueueFlush() {
- Connection.FORCE_ASYNC_QUEUE=false;
- final DMStats stats = getSystem().getDistributionManager().getStats();
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return stats.getAsyncThreads() == 0;
- }
- public String description() {
- return "Waiting for async threads to disappear";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
- }
-
- private void forceQueuing(final Region r) throws CacheException {
- Connection.FORCE_ASYNC_QUEUE=true;
- final DMStats stats = getSystem().getDistributionManager().getStats();
- r.put("forcekey", "forcevalue");
-
- // wait for the flusher to get its first flush in progress
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return stats.getAsyncQueueFlushesInProgress() != 0;
- }
- public String description() {
- return "waiting for flushes to start";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
- }
-
- /**
- * Make sure that noack puts to a receiver
- * will eventually queue and then catch up.
- */
- public void testNoAck() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "1");
- doCreateOtherVm(p, false);
-
- int repeatCount = 2;
- int count = 0;
- while (repeatCount-- > 0) {
- forceQueuing(r);
- final Object key = "key";
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// long conflatedMsgs = stats.getAsyncConflatedMsgs();
- long queueSize = stats.getAsyncQueueSize();
- String lastValue = "";
- final long intialQueuedMsgs = queuedMsgs;
- long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
- try {
- // loop while we still have queued the initially queued msgs
- // OR the cur # of queued msgs < 6
- while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
- String value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- queueSize = stats.getAsyncQueueSize();
- queuedMsgs = stats.getAsyncQueuedMsgs();
- dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- curQueuedMsgs = queuedMsgs - dequeuedMsgs;
- }
- getLogWriter().info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
- } finally {
- forceQueueFlush();
- }
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return stats.getAsyncQueueSize() == 0;
- }
- public String description() {
- return "Waiting for queues to empty";
- }
- };
- final long start = System.currentTimeMillis();
- DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
- final long finish = System.currentTimeMillis();
- getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);
-
- checkLastValueInOtherVm(lastValue, null);
- }
- }
- /**
- * Create a region named AckRegion with ACK scope
- */
- protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- if (mirror) {
- factory.setDataPolicy(DataPolicy.REPLICATE);
- }
- if (conflate) {
- factory.setEnableAsyncConflation(true);
- }
- final Region r = createRootRegion("AckRegion", factory.create());
- return r;
- }
- /**
- * Make sure that noack puts to a receiver
- * will eventually queue and then catch up with conflation
- */
- public void testNoAckConflation() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "1");
- doCreateOtherVm(p, false);
-
- forceQueuing(r);
- final Object key = "key";
- int count = 0;
-// long queuedMsgs = stats.getAsyncQueuedMsgs();
-// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// long queueSize = stats.getAsyncQueueSize();
- String lastValue = "";
- final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
- long start = 0;
- try {
- while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
- String value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- // getLogWriter().info("After " + count + " "
- // + " puts queueSize=" + queueSize
- // + " queuedMsgs=" + queuedMsgs
- // + " dequeuedMsgs=" + dequeuedMsgs
- // + " conflatedMsgs=" + conflatedMsgs);
- }
- start = System.currentTimeMillis();
- } finally {
- forceQueueFlush();
- }
-// queueSize = stats.getAsyncQueueSize();
-// queuedMsgs = stats.getAsyncQueuedMsgs();
-
-// getLogWriter().info("After " + count + " "
-// + " puts slowrec mode kicked in by queuing "
-// + queuedMsgs + " for a total size of " + queueSize
-// + " conflatedMsgs=" + conflatedMsgs
-// + " dequeuedMsgs=" + dequeuedMsgs);
-// final long start = System.currentTimeMillis();
-// while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
-// try {Thread.sleep(100);} catch (InterruptedException ignore) {}
-// queueSize = stats.getAsyncQueueSize();
-// queuedMsgs = stats.getAsyncQueuedMsgs();
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// conflatedMsgs = stats.getAsyncConflatedMsgs();
-// getLogWriter().info("After sleeping"
-// + " queueSize=" + queueSize
-// + " queuedMsgs=" + queuedMsgs
-// + " dequeuedMsgs=" + dequeuedMsgs
-// + " conflatedMsgs=" + conflatedMsgs);
- final long finish = System.currentTimeMillis();
- getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The lastValue was " + lastValue);
-
- checkLastValueInOtherVm(lastValue, null);
- }
- /**
- * make sure ack does not hang
- * make sure two ack updates do not conflate but are both queued
- */
- public void testAckConflation() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- final Region ar = createAckRegion(false, true);
- ar.create("ackKey", "ackValue");
-
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "2");
- doCreateOtherVm(p, false);
-
- forceQueuing(r);
- {
- // make sure ack does not hang
- // make sure two ack updates do not conflate but are both queued
- long startQueuedMsgs = stats.getAsyncQueuedMsgs();
- long startConflatedMsgs = stats.getAsyncConflatedMsgs();
- Thread t = new Thread(new Runnable() {
- public void run() {
- ar.put("ackKey", "ackValue");
- }
- });
- t.start();
- Thread t2 = new Thread(new Runnable() {
- public void run() {
- ar.put("ackKey", "ackValue");
- }
- });
- t2.start();
- // give threads a chance to get queued
- try {Thread.sleep(100);} catch (InterruptedException ignore) {fail("interrupted");}
- forceQueueFlush();
- DistributedTestCase.join(t, 2 * 1000, getLogWriter());
- DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
- long endQueuedMsgs = stats.getAsyncQueuedMsgs();
- long endConflatedMsgs = stats.getAsyncConflatedMsgs();
- assertEquals(startConflatedMsgs, endConflatedMsgs);
- // queue should be flushed by the time we get an ack
- assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
- assertEquals(startQueuedMsgs+2, endQueuedMsgs);
- }
- }
- /**
- * Make sure that only sequences of updates are conflated
- * Also checks that sending to a conflating region and non-conflating region
- * does the correct thing.
- * Test disabled because it intermittently fails due to race conditions
- * in test. This has been fixed in congo's tests. See bug 35357.
- */
- public void _disabled_testConflationSequence() throws CacheException {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- factory.setEnableAsyncConflation(false);
- final Region noConflate = createRootRegion("noConflate", factory.create());
- final DMStats stats = getSystem().getDistributionManager().getStats();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "1");
- doCreateOtherVm(p, false);
- {
- VM vm = getOtherVm();
- vm.invoke(new CacheSerializableRunnable("create noConflate") {
- public void run2() throws CacheException {
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
- createRootRegion("noConflate", af.create());
- }
- });
- }
-
- // now make sure update+destroy does not conflate
- final Object key = "key";
- getLogWriter().info("[testConflationSequence] about to force queuing");
- forceQueuing(r);
-
- int count = 0;
- String value = "";
- String lastValue = value;
- Object mylcb = null;
- long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- int endCount = count+60;
-
- getLogWriter().info("[testConflationSequence] about to build up queue");
- long begin = System.currentTimeMillis();
- while (count < endCount) {
- value = "count=" + count;
- lastValue = value;
- r.create(key, value);
- count ++;
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- mylcb = value;
- r.destroy(key, mylcb);
- count ++;
- lastValue = null;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
- assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
- forceQueueFlush();
- checkLastValueInOtherVm(lastValue, mylcb);
-
- // now make sure create+update+localDestroy does not conflate
- getLogWriter().info("[testConflationSequence] force queuing create-update-destroy");
- forceQueuing(r);
- initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- endCount = count + 40;
-
- getLogWriter().info("[testConflationSequence] create-update-destroy");
- begin = System.currentTimeMillis();
- while (count < endCount) {
- value = "count=" + count;
- lastValue = value;
- r.create(key, value);
- count++;
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- r.localDestroy(key);
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
- assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
- forceQueueFlush();
- checkLastValueInOtherVm(lastValue, null);
-
- // now make sure update+invalidate does not conflate
- getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
- forceQueuing(r);
- initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
- value = "count=" + count;
- lastValue = value;
- r.create(key, value);
- count++;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- endCount = count + 40;
-
- getLogWriter().info("[testConflationSequence] update-invalidate");
- begin = System.currentTimeMillis();
- while (count < endCount) {
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- r.invalidate(key);
- count ++;
- lastValue = CHECK_INVALID;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
- assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
- forceQueueFlush();
- getLogWriter().info("[testConflationSequence] assert other vm");
- checkLastValueInOtherVm(lastValue, null);
-
- r.destroy(key);
-
- // now make sure updates to a conflating region are conflated even while
- // updates to a non-conflating are not.
- getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
- forceQueuing(r);
- final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
-// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-
- value = "count=" + count;
- lastValue = value;
- long conflatedMsgs = stats.getAsyncConflatedMsgs();
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- r.create(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- r.put(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- noConflate.create(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- noConflate.put(key, value);
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- count++;
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- endCount = count + 80;
-
- begin = System.currentTimeMillis();
- getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
- + " queuedMsgs=" + stats.getAsyncQueuedMsgs()
- + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
- + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
- + " asyncSocketWrites=" + stats.getAsyncSocketWrites()
- );
- while (count < endCount) {
- // make sure we continue to have a flush in progress
- assertEquals(1, stats.getAsyncThreads());
- assertEquals(1, stats.getAsyncQueues());
- assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
- // make sure we are not completing any flushing while this loop is in progress
- assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
- value = "count=" + count;
- lastValue = value;
- r.put(key, value);
- count ++;
- // make sure it was conflated and not queued
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- conflatedMsgs++;
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
- noConflate.put(key, value);
- // make sure it was queued and not conflated
- queuedMsgs++;
- assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
- assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
- assertTrue(System.currentTimeMillis() < begin+1000*60*2);
- }
-
- forceQueueFlush();
- getLogWriter().info("[testConflationSequence] assert other vm");
- checkLastValueInOtherVm(lastValue, null);
- }
- /**
- * Make sure that exceeding the queue size limit causes a disconnect.
- */
- public void testSizeDisconnect() throws CacheException {
- final String expected =
- "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
- "||java.io.IOException: Broken pipe";
- final String addExpected =
- "<ExpectedException action=add>" + expected + "</ExpectedException>";
- final String removeExpected =
- "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
- // set others before vm0 connects
- final Set others = dm.getOtherDistributionManagerIds();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-max-queue-size", "1"); // 1 meg
- doCreateOtherVm(p, false);
-
-
- final Object key = "key";
- final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give us 10 of these 100K msgs before queue full
- final byte[] value = new byte[VALUE_SIZE];
- int count = 0;
- forceQueuing(r);
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- long queueSize = stats.getAsyncQueueSize();
-
- getCache().getLogger().info(addExpected);
- try {
- while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
- r.put(key, value);
- count ++;
- if (stats.getAsyncQueueSize() > 0) {
- queuedMsgs = stats.getAsyncQueuedMsgs();
- queueSize = stats.getAsyncQueueSize();
- }
- if (count > 100) {
- fail("should have exceeded max-queue-size by now");
- }
- }
- getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
- // make sure we lost a connection to vm0
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return dm.getOtherDistributionManagerIds().size() <= others.size()
- && stats.getAsyncQueueSize() == 0;
- }
- public String description() {
- return "waiting for connection loss";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
- }
- finally {
- forceQueueFlush();
- getCache().getLogger().info(removeExpected);
- }
- assertEquals(others, dm.getOtherDistributionManagerIds());
- assertEquals(0, stats.getAsyncQueueSize());
- }
- /**
- * Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
- * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
- * in build.xml in the splitbrainNov07 branch. It had been disabled since
- * June 2006 due to hangs. Some of the tests, like this one, still need
- * work because the periodically (some quite often) fail.
- */
- public void donottestTimeoutDisconnect() throws CacheException {
- final String expected =
- "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
- "||java.io.IOException: Broken pipe";
- final String addExpected =
- "<ExpectedException action=add>" + expected + "</ExpectedException>";
- final String removeExpected =
- "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
- // set others before vm0 connects
- final Set others = dm.getOtherDistributionManagerIds();
-
- // create receiver in vm0 with queuing enabled
- Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-queue-timeout", "500"); // 500 ms
- doCreateOtherVm(p, true);
-
-
- final Object key = "key";
- final int VALUE_SIZE = 1024; // 1k
- final byte[] value = new byte[VALUE_SIZE];
- int count = 0;
- long queuedMsgs = stats.getAsyncQueuedMsgs();
- long queueSize = stats.getAsyncQueueSize();
- final long timeoutLimit = System.currentTimeMillis() + 5000;
-
- getCache().getLogger().info(addExpected);
- try {
- while (stats.getAsyncQueueTimeouts() == 0) {
- r.put(key, value);
- count ++;
- if (stats.getAsyncQueueSize() > 0) {
- queuedMsgs = stats.getAsyncQueuedMsgs();
- queueSize = stats.getAsyncQueueSize();
- }
- if (System.currentTimeMillis() > timeoutLimit) {
- fail("should have exceeded async-queue-timeout by now");
- }
- }
- getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
- // make sure we lost a connection to vm0
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- if (dm.getOtherDistributionManagerIds().size() > others.size()) {
- return false;
- }
- return stats.getAsyncQueueSize() == 0;
- }
- public String description() {
- return "waiting for departure";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
- }
- finally {
- getCache().getLogger().info(removeExpected);
- }
- assertEquals(others, dm.getOtherDistributionManagerIds());
- assertEquals(0, stats.getAsyncQueueSize());
- }
-
- // static helper methods ---------------------------------------------------
-
- private static final String KEY_SLEEP = "KEY_SLEEP";
- private static final String KEY_WAIT = "KEY_WAIT";
- private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
-
- protected final static int CALLBACK_CREATE = 0;
- protected final static int CALLBACK_UPDATE = 1;
- protected final static int CALLBACK_INVALIDATE = 2;
- protected final static int CALLBACK_DESTROY = 3;
- protected final static int CALLBACK_REGION_INVALIDATE = 4;
-
- protected final static Integer CALLBACK_CREATE_INTEGER = new Integer(CALLBACK_CREATE);
- protected final static Integer CALLBACK_UPDATE_INTEGER = new Integer(CALLBACK_UPDATE);
- protected final static Integer CALLBACK_INVALIDATE_INTEGER = new Integer(CALLBACK_INVALIDATE);
- protected final static Integer CALLBACK_DESTROY_INTEGER = new Integer(CALLBACK_DESTROY);
- protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new Integer(CALLBACK_REGION_INVALIDATE);
-
- private static class CallbackWrapper {
- public final Object callbackArgument;
- public final int callbackType;
- public CallbackWrapper(Object callbackArgument, int callbackType) {
- this.callbackArgument = callbackArgument;
- this.callbackType = callbackType;
- }
- public String toString() {
- return "CallbackWrapper: " + callbackArgument.toString() + " of type " + callbackType;
- }
- }
-
- protected static class ControlListener extends CacheListenerAdapter {
- public final LinkedList callbackArguments = new LinkedList();
- public final LinkedList callbackTypes = new LinkedList();
- public final Object CONTROL_LOCK = new Object();
-
- public void afterCreate(EntryEvent event) {
- getLogWriter().info(event.getRegion().getName() + " afterCreate " + event.getKey());
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
- this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- processEvent(event);
- }
- public void afterUpdate(EntryEvent event) {
- getLogWriter().info(event.getRegion().getName() + " afterUpdate " + event.getKey());
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
- this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- processEvent(event);
- }
- public void afterInvalidate(EntryEvent event) {
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_INVALIDATE));
- this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- }
- public void afterDestroy(EntryEvent event) {
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_DESTROY));
- this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- }
- public void afterRegionInvalidate(RegionEvent event) {
- synchronized(this.CONTROL_LOCK) {
- if (event.getCallbackArgument() != null) {
- this.callbackArguments.add(
- new CallbackWrapper(event.getCallbackArgument(), CALLBACK_REGION_INVALIDATE));
- this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
- this.CONTROL_LOCK.notifyAll();
- }
- }
- }
- private void processEvent(EntryEvent event) {
- if (event.getKey().equals(KEY_SLEEP)) {
- processSleep(event);
- }
- else if (event.getKey().equals(KEY_WAIT)) {
- processWait(event);
- }
- else if (event.getKey().equals(KEY_DISCONNECT)) {
- processDisconnect(event);
- }
- }
- private void processSleep(EntryEvent event) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
- getLogWriter().info("[processSleep] sleeping for " + sleepMs);
- try {
- Thread.sleep(sleepMs);
- } catch (InterruptedException ignore) {fail("interrupted");}
- }
- private void processWait(EntryEvent event) {
- int sleepMs = ((Integer)event.getNewValue()).intValue();
- getLogWriter().info("[processWait] waiting for " + sleepMs);
- synchronized(this.CONTROL_LOCK) {
- try {
- this.CONTROL_LOCK.wait(sleepMs);
- } catch (InterruptedException ignore) {return;}
- }
- }
- private void processDisconnect(EntryEvent event) {
- getLogWriter().info("[processDisconnect] disconnecting");
- disconnectFromDS();
- }
- };
-
- /**
- * Make sure a multiple no ack regions conflate properly.
- * [bruce] disabled when use of this dunit test class was reenabled in
- * the splitbrainNov07 branch. The class had been disabled since
- * June 2006 r13222 in the trunk. This test is failing because conflation
- * isn't kicking in for some reason.
- */
- public void donottestMultipleRegionConflation() throws Throwable {
- try {
- doTestMultipleRegionConflation();
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable t) {
- getLogWriter().error("Encountered exception: ", t);
- throw t;
- }
- finally {
- // make sure other vm was notified even if test failed
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
- doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
- }
- }
- protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
- protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
- private void doTestMultipleRegionConflation() throws Exception {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r1 = createRootRegion("slowrec1", factory.create());
- final Region r2 = createRootRegion("slowrec2", factory.create());
-
- assertTrue(getSystem().isConnected());
- assertNotNull(r1);
- assertFalse(r1.isDestroyed());
- assertNotNull(getCache());
- assertNotNull(getCache().getRegion("slowrec1"));
- assertNotNull(r2);
- assertFalse(r2.isDestroyed());
- assertNotNull(getCache());
- assertNotNull(getCache().getRegion("slowrec2"));
-
- final DM dm = getSystem().getDistributionManager();
- final Serializable controllerVM = dm.getDistributionManagerId();
- final DMStats stats = dm.getStats();
- final int millisToWait = 1000 * 60 * 5; // 5 minutes
-
- // set others before vm0 connects
- long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-
- // create receiver in vm0 with queuing enabled
- final Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-queue-timeout", "86400000"); // max value
- p.setProperty("async-max-queue-size", "1024"); // max value
-
- getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
- public void run2() throws CacheException {
- getSystem(p);
-
- DM dm = getSystem().getDistributionManager();
- assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
-
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
-
- doTestMultipleRegionConflation_R1_Listener = new ControlListener();
- af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
- createRootRegion("slowrec1", af.create());
-
- doTestMultipleRegionConflation_R2_Listener = new ControlListener();
- af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
- createRootRegion("slowrec2", af.create());
- }
- });
-
- // put vm0 cache listener into wait
- getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 into wait");
- r1.put(KEY_WAIT, new Integer(millisToWait));
-
- // build up queue size
- getLogWriter().info("[doTestMultipleRegionConflation] building up queue size...");
- final Object key = "key";
- final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
- final int VALUE_SIZE = socketBufferSize*3;
- //final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
- final byte[] value = new byte[VALUE_SIZE];
-
- int count = 0;
- while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
- count++;
- r1.put(key, value);
- }
-
- getLogWriter().info("[doTestMultipleRegionConflation] After " +
- count + " puts of size " + VALUE_SIZE +
- " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
- // put values that will be asserted
- final Object key1 = "key1";
- final Object key2 = "key2";
- Object putKey = key1;
- boolean flag = true;
- for (int i = 0; i < 30; i++) {
- if (i == 10) putKey = key2;
- if (flag) {
- if (i == 6) {
- r1.invalidate(putKey, new Integer(i));
- } else if (i == 24) {
- r1.invalidateRegion(new Integer(i));
- } else {
- r1.put(putKey, value, new Integer(i));
- }
- } else {
- if (i == 15) {
- r2.destroy(putKey, new Integer(i));
- } else {
- r2.put(putKey, value, new Integer(i));
- }
- }
- flag = !flag;
- }
-
- // r1: key1, 0, create
- // r1: key1, 4, update
- // r1: key1, 6, invalidate
- // r1: key1, 8, update
-
- // r1: key2, 10, create
- // r1: 24, invalidateRegion
- // r1: key2, 28, update
-
- // r2: key1, 1, create
- // r2: key1, 9, update
-
- // r2: key2, 11, create
- // r2: key2, 13, update
- // r2: key2, 15, destroy
- // r2: key2, 17, create
- // r2: key2, 29, update
-
- final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 };
- final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
- { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
- CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE };
-
- final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
- final int[] r2ExpectedTypes = new int[]
- { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
- CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE };
-
- // send notify to vm0
- getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
- doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
-
- // wait for queue to be flushed
- getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
- getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
- public void run() {
- try {
- synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
- while (doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < r1ExpectedArgs.length) {
- doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- }
- synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
- while (doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < r2ExpectedArgs.length) {
- doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- }
- } catch (InterruptedException ignore) {fail("interrupted");}
- }
- });
-
- // assert values on both listeners
- getLogWriter().info("[doTestMultipleRegionConflation] assert callback arguments");
- getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
- public void run() {
- synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
- getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments=" + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
- getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" + doTestMultipleRegionConflation_R1_Listener.callbackTypes);
- assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
- doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
- int i = 0;
- for (Iterator iter = doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); iter.hasNext();) {
- CallbackWrapper wrapper = (CallbackWrapper) iter.next();
- assertEquals(new Integer(r1ExpectedArgs[i]),
- wrapper.callbackArgument);
- assertEquals(new Integer(r1ExpectedTypes[i]),
- doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
- i++;
- }
- }
- synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
- getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments=" + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
- getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" + doTestMultipleRegionConflation_R2_Listener.callbackTypes);
- assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
- doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
- int i = 0;
- for (Iterator iter = doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); iter.hasNext();) {
- CallbackWrapper wrapper = (CallbackWrapper) iter.next();
- assertEquals(new Integer(r2ExpectedArgs[i]),
- wrapper.callbackArgument);
- assertEquals(new Integer(r2ExpectedTypes[i]),
- doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
- i++;
- }
- }
- }
- });
- }
-
- /**
- * Make sure a disconnect causes queue memory to be released.
- */
- public void testDisconnectCleanup() throws Throwable {
- try {
- doTestDisconnectCleanup();
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable t) {
- getLogWriter().error("Encountered exception: ", t);
- throw t;
- }
- finally {
- // make sure other vm was notified even if test failed
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
- doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
- }
- }
- protected static ControlListener doTestDisconnectCleanup_Listener;
- private void doTestDisconnectCleanup() throws Exception {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
- // set others before vm0 connects
- final Set others = dm.getOtherDistributionManagerIds();
- long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
- final int initialQueues = stats.getAsyncQueues();
-
- // create receiver in vm0 with queuing enabled
- final Properties p = new Properties();
- p.setProperty("async-distribution-timeout", "5");
- p.setProperty("async-queue-timeout", "86400000"); // max value
- p.setProperty("async-max-queue-size", "1024"); // max value
-
- getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
- public void run2() throws CacheException {
- getSystem(p);
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
-
- doTestDisconnectCleanup_Listener = new ControlListener();
- af.setCacheListener(doTestDisconnectCleanup_Listener);
- createRootRegion("slowrec", af.create());
- }
- });
-
- // put vm0 cache listener into wait
- getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
- int millisToWait = 1000 * 60 * 5; // 5 minutes
- r.put(KEY_WAIT, new Integer(millisToWait));
- r.put(KEY_DISCONNECT, KEY_DISCONNECT);
-
- // build up queue size
- getLogWriter().info("[testDisconnectCleanup] building up queue size...");
- final Object key = "key";
- final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
- final int VALUE_SIZE = socketBufferSize*3;
- //final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
- final byte[] value = new byte[VALUE_SIZE];
-
- int count = 0;
- final long abortMillis = System.currentTimeMillis() + millisToWait;
- while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
- count++;
- r.put(key, value);
- assertFalse(System.currentTimeMillis() >= abortMillis);
- }
-
- getLogWriter().info("[testDisconnectCleanup] After " +
- count + " puts of size " + VALUE_SIZE +
- " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
- while (stats.getAsyncQueuedMsgs() < 10 ||
- stats.getAsyncQueueSize() < VALUE_SIZE*10) {
- count++;
- r.put(key, value);
- assertFalse(System.currentTimeMillis() >= abortMillis);
- }
- assertTrue(stats.getAsyncQueuedMsgs() >= 10);
-
- while (stats.getAsyncQueues() < 1) {
- pause(100);
- assertFalse(System.currentTimeMillis() >= abortMillis);
- }
-
- getLogWriter().info("[testDisconnectCleanup] After " +
- count + " puts of size " + VALUE_SIZE + " queue size has reached " +
- stats.getAsyncQueueSize() + " bytes and number of queues is " +
- stats.getAsyncQueues() + ".");
-
- assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
- assertEquals(initialQueues+1, stats.getAsyncQueues());
-
- // assert vm0 is still connected
- assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
-
- // send notify to vm0
- getLogWriter().info("[testDisconnectCleanup] wake up vm0");
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
- doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
-
- // make sure we lost a connection to vm0
- getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- return dm.getOtherDistributionManagerIds().size() <= others.size();
- }
- public String description() {
- return "waiting for disconnect";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
- assertEquals(others, dm.getOtherDistributionManagerIds());
-
- // check free memory... perform wait loop with System.gc
- getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
- ev = new WaitCriterion() {
- public boolean done() {
- if (stats.getAsyncQueues() <= initialQueues) {
- return true;
- }
- Runtime.getRuntime().gc();
- return false;
- }
- public String description() {
- return "waiting for queue cleanup";
- }
- };
- DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
-// getLogWriter().info("[testDisconnectCleanup] initialQueues=" +
-// initialQueues + " asyncQueues=" + stats.getAsyncQueues());
- assertEquals(initialQueues, stats.getAsyncQueues());
- }
-
- /**
- * Make sure a disconnect causes queue memory to be released.<p>
- * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
- * in build.xml in the splitbrainNov07 branch. It had been disabled since
- * June 2006 due to hangs. Some of the tests, like this one, still need
- * work because the periodically (some quite often) fail.
- */
- public void donottestPartialMessage() throws Throwable {
- try {
- doTestPartialMessage();
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- }
- catch (Throwable t) {
- getLogWriter().error("Encountered exception: ", t);
- throw t;
- }
- finally {
- // make sure other vm was notified even if test failed
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
- }
- }
- });
- }
- }
- protected static ControlListener doTestPartialMessage_Listener;
- private void doTestPartialMessage() throws Exception {
- final AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_NO_ACK);
- factory.setEnableAsyncConflation(true);
- final Region r = createRootRegion("slowrec", factory.create());
- final DM dm = getSystem().getDistributionManager();
- final DMStats stats = dm.getStats();
-
- // set others before vm0 connects
-// final Set others = dm.getOtherDistributionManagerIds();
- long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-// int initialQueues = stats.getAsyncQueues();
-
- // create receiver in vm0 with queuing enabled
- final Properties p = new Properties();
- p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4 sec
- p.setProperty("async-queue-timeout", "86400000"); // max value
- p.setProperty("async-max-queue-size", "1024"); // max value
-
- getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
- public void run2() throws CacheException {
- getSystem(p);
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_NO_ACK);
- af.setDataPolicy(DataPolicy.REPLICATE);
-
- doTestPartialMessage_Listener = new ControlListener();
- af.setCacheListener(doTestPartialMessage_Listener);
- createRootRegion("slowrec", af.create());
- }
- });
-
- // put vm0 cache listener into wait
- getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
- final int millisToWait = 1000 * 60 * 5; // 5 minutes
- r.put(KEY_WAIT, new Integer(millisToWait));
-
- // build up queue size
- getLogWriter().info("[testPartialMessage] building up queue size...");
- final Object key = "key";
- final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
- final int VALUE_SIZE = socketBufferSize*3;
- //1024 * 20; // 20 KB
- final byte[] value = new byte[VALUE_SIZE];
-
- int count = 0;
- while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
- count++;
- r.put(key, value, new Integer(count));
- }
-
- final int partialId = count;
- assertEquals(0, stats.getAsyncConflatedMsgs());
-
- getLogWriter().info("[testPartialMessage] After " +
- count + " puts of size " + VALUE_SIZE +
- " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
- pause(2000);
-
- // conflate 10 times
- while (stats.getAsyncConflatedMsgs() < 10) {
- count++;
- r.put(key, value, new Integer(count));
- if (count == partialId+1) {
-// long begin = System.currentTimeMillis();
-// while (stats.getAsyncQueues() < 1) {
-// pause(100);
-// assertFalse(System.currentTimeMillis() > begin+1000*10);
-// }
- assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
- assertEquals(0, stats.getAsyncConflatedMsgs());
- } else if (count == partialId+2) {
- assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
- assertEquals(1, stats.getAsyncConflatedMsgs());
- }
- }
-
- final int conflateId = count;
-
- final int[] expectedArgs = { partialId, conflateId };
-
- // send notify to vm0
- getLogWriter().info("[testPartialMessage] wake up vm0");
- getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
- public void run() {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- doTestPartialMessage_Listener.CONTROL_LOCK.notify();
- }
- }
- });
-
- // wait for queue to be flushed
- getLogWriter().info("[testPartialMessage] wait for vm0");
- getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
- public void run() {
- try {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- boolean done = false;
- while (!done) {
- if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
- CallbackWrapper last = (CallbackWrapper)
- doTestPartialMessage_Listener.callbackArguments.getLast();
- Integer lastId = (Integer) last.callbackArgument;
- if (lastId.intValue() == conflateId) {
- done = true;
- } else {
- doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- } else {
- doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
- }
- }
- }
- } catch (InterruptedException ignore) {fail("interrupted");}
- }
- });
-
- // assert values on both listeners
- getLogWriter().info("[testPartialMessage] assert callback arguments");
- getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
- public void run() {
- synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
- getLogWriter().info("[testPartialMessage] " +
- "doTestPartialMessage_Listener.callbackArguments=" +
- doTestPartialMessage_Listener.callbackArguments);
-
- assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
- doTestPartialMessage_Listener.callbackTypes.size());
-
- int i = 0;
- Iterator argIter =
- doTestPartialMessage_Listener.callbackArguments.iterator();
- Iterator typeIter =
- doTestPartialMessage_Listener.callbackTypes.iterator();
-
- while (argIter.hasNext()) {
- CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
- Integer arg = (Integer) wrapper.callbackArgument;
- typeIter.next(); // Integer type
- if (arg.intValue() < partialId) {
- continue;
- }
- assertEquals(new Integer(expectedArgs[i]), arg);
- //assertEquals(CALLBACK_UPDATE_INTEGER, type);
- i++;
- }
- }
- }
- });
-
- }
-}
-
[2/3] incubator-geode git commit: Remove Disabled from names of
tests. Ensure each test has a Category and add Ignore to any test that is
disabled due to being broken.
Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
new file mode 100644
index 0000000..e79eddc
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.util.*;
+import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.internal.tcp.Connection;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.*;
+
+import java.io.*;
+import java.util.*;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to make sure slow receiver queuing is working
+ *
+ * @author darrel
+ * @since 4.2.1
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class SlowRecDUnitTest extends CacheTestCase {
+
+ public SlowRecDUnitTest(String name) {
+ super(name);
+ }
+
+ // this test has special config of its distributed system so
+ // the setUp and tearDown methods need to make sure we don't
+ // use the ds from previous test and that we don't leave ours around
+ // for the next test to use.
+
+ public void setUp() throws Exception {
+ try {
+ disconnectAllFromDS();
+ } finally {
+ super.setUp();
+ }
+ }
+ public void tearDown2() throws Exception {
+ try {
+ super.tearDown2();
+ } finally {
+ disconnectAllFromDS();
+ }
+ }
+
+ ////////////////////// Test Methods //////////////////////
+
+ private VM getOtherVm() {
+ Host host = Host.getHost(0);
+ return host.getVM(0);
+ }
+
+ static protected Object lastCallback = null;
+
+ private void doCreateOtherVm(final Properties p, final boolean addListener) {
+ VM vm = getOtherVm();
+ vm.invoke(new CacheSerializableRunnable("create root") {
+ public void run2() throws CacheException {
+ getSystem(p);
+ createAckRegion(true, false);
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_NO_ACK);
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ if (addListener) {
+ CacheListener cl = new CacheListenerAdapter() {
+ public void afterUpdate(EntryEvent event) {
+ // make the slow receiver event slower!
+ try {Thread.sleep(500);} catch (InterruptedException shuttingDown) {fail("interrupted");}
+ }
+ };
+ af.setCacheListener(cl);
+ } else {
+ CacheListener cl = new CacheListenerAdapter() {
+ public void afterCreate(EntryEvent event) {
+// getLogWriter().info("afterCreate " + event.getKey());
+ if (event.getCallbackArgument() != null) {
+ lastCallback = event.getCallbackArgument();
+ }
+ if (event.getKey().equals("sleepkey")) {
+ int sleepMs = ((Integer)event.getNewValue()).intValue();
+// getLogWriter().info("sleepkey sleeping for " + sleepMs);
+ try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ }
+ public void afterUpdate(EntryEvent event) {
+// getLogWriter().info("afterUpdate " + event.getKey());
+ if (event.getCallbackArgument() != null) {
+ lastCallback = event.getCallbackArgument();
+ }
+ if (event.getKey().equals("sleepkey")) {
+ int sleepMs = ((Integer)event.getNewValue()).intValue();
+// getLogWriter().info("sleepkey sleeping for " + sleepMs);
+ try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ }
+ public void afterInvalidate(EntryEvent event) {
+ if (event.getCallbackArgument() != null) {
+ lastCallback = event.getCallbackArgument();
+ }
+ }
+ public void afterDestroy(EntryEvent event) {
+ if (event.getCallbackArgument() != null) {
+ lastCallback = event.getCallbackArgument();
+ }
+ }
+ };
+ af.setCacheListener(cl);
+ }
+ Region r1 = createRootRegion("slowrec", af.create());
+ // place holder so we receive updates
+ r1.create("key", "value");
+ }
+ });
+ }
+ static protected final String CHECK_INVALID = "CHECK_INVALID";
+
+ private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
+ VM vm = getOtherVm();
+ vm.invoke(new CacheSerializableRunnable("check last value") {
+ public void run2() throws CacheException {
+ Region r1 = getRootRegion("slowrec");
+ if (lcb != null) {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return lcb.equals(lastCallback);
+ }
+ public String description() {
+ return "waiting for callback";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+ assertEquals(lcb, lastCallback);
+ }
+ if (lastValue == null) {
+ final Region r = r1;
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return r.getEntry("key") == null;
+ }
+ public String description() {
+ return "waiting for key to become null";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+ assertEquals(null, r1.getEntry("key"));
+ } else if (CHECK_INVALID.equals(lastValue)) {
+ // should be invalid
+ {
+ final Region r = r1;
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ Entry e = r.getEntry("key");
+ if (e == null) {
+ return false;
+ }
+ return e.getValue() == null;
+ }
+ public String description() {
+ return "waiting for invalidate";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+// assertNotNull(re);
+// assertEquals(null, value);
+ }
+ } else {
+ {
+ int retryCount = 1000;
+ Region.Entry re = null;
+ Object value = null;
+ while (retryCount-- > 0) {
+ re = r1.getEntry("key");
+ if (re != null) {
+ value = re.getValue();
+ if (value != null && value.equals(lastValue)) {
+ break;
+ }
+ }
+ try {Thread.sleep(50);} catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ assertNotNull(re);
+ assertNotNull(value);
+ assertEquals(lastValue, value);
+ }
+ }
+ }
+ });
+ }
+
+ private void forceQueueFlush() {
+ Connection.FORCE_ASYNC_QUEUE=false;
+ final DMStats stats = getSystem().getDistributionManager().getStats();
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return stats.getAsyncThreads() == 0;
+ }
+ public String description() {
+ return "Waiting for async threads to disappear";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
+ }
+
+ private void forceQueuing(final Region r) throws CacheException {
+ Connection.FORCE_ASYNC_QUEUE=true;
+ final DMStats stats = getSystem().getDistributionManager().getStats();
+ r.put("forcekey", "forcevalue");
+
+ // wait for the flusher to get its first flush in progress
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return stats.getAsyncQueueFlushesInProgress() != 0;
+ }
+ public String description() {
+ return "waiting for flushes to start";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+ }
+
+ /**
+ * Make sure that noack puts to a receiver
+ * will eventually queue and then catch up.
+ */
+ public void testNoAck() throws CacheException {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final DMStats stats = getSystem().getDistributionManager().getStats();
+
+ // create receiver in vm0 with queuing enabled
+ Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "1");
+ doCreateOtherVm(p, false);
+
+ int repeatCount = 2;
+ int count = 0;
+ while (repeatCount-- > 0) {
+ forceQueuing(r);
+ final Object key = "key";
+ long queuedMsgs = stats.getAsyncQueuedMsgs();
+ long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+// long conflatedMsgs = stats.getAsyncConflatedMsgs();
+ long queueSize = stats.getAsyncQueueSize();
+ String lastValue = "";
+ final long intialQueuedMsgs = queuedMsgs;
+ long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
+ try {
+ // loop while we still have queued the initially queued msgs
+ // OR the cur # of queued msgs < 6
+ while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
+ String value = "count=" + count;
+ lastValue = value;
+ r.put(key, value);
+ count ++;
+ queueSize = stats.getAsyncQueueSize();
+ queuedMsgs = stats.getAsyncQueuedMsgs();
+ dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ curQueuedMsgs = queuedMsgs - dequeuedMsgs;
+ }
+ getLogWriter().info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
+ } finally {
+ forceQueueFlush();
+ }
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return stats.getAsyncQueueSize() == 0;
+ }
+ public String description() {
+ return "Waiting for queues to empty";
+ }
+ };
+ final long start = System.currentTimeMillis();
+ DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+ final long finish = System.currentTimeMillis();
+ getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);
+
+ checkLastValueInOtherVm(lastValue, null);
+ }
+ }
+ /**
+ * Create a region named AckRegion with ACK scope
+ */
+ protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ if (mirror) {
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ }
+ if (conflate) {
+ factory.setEnableAsyncConflation(true);
+ }
+ final Region r = createRootRegion("AckRegion", factory.create());
+ return r;
+ }
+ /**
+ * Make sure that noack puts to a receiver
+ * will eventually queue and then catch up with conflation
+ */
+ public void testNoAckConflation() throws CacheException {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ factory.setEnableAsyncConflation(true);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final DMStats stats = getSystem().getDistributionManager().getStats();
+
+ // create receiver in vm0 with queuing enabled
+ Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "1");
+ doCreateOtherVm(p, false);
+
+ forceQueuing(r);
+ final Object key = "key";
+ int count = 0;
+// long queuedMsgs = stats.getAsyncQueuedMsgs();
+// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+// long queueSize = stats.getAsyncQueueSize();
+ String lastValue = "";
+ final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
+ long start = 0;
+ try {
+ while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
+ String value = "count=" + count;
+ lastValue = value;
+ r.put(key, value);
+ count ++;
+ // getLogWriter().info("After " + count + " "
+ // + " puts queueSize=" + queueSize
+ // + " queuedMsgs=" + queuedMsgs
+ // + " dequeuedMsgs=" + dequeuedMsgs
+ // + " conflatedMsgs=" + conflatedMsgs);
+ }
+ start = System.currentTimeMillis();
+ } finally {
+ forceQueueFlush();
+ }
+// queueSize = stats.getAsyncQueueSize();
+// queuedMsgs = stats.getAsyncQueuedMsgs();
+
+// getLogWriter().info("After " + count + " "
+// + " puts slowrec mode kicked in by queuing "
+// + queuedMsgs + " for a total size of " + queueSize
+// + " conflatedMsgs=" + conflatedMsgs
+// + " dequeuedMsgs=" + dequeuedMsgs);
+// final long start = System.currentTimeMillis();
+// while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
+// try {Thread.sleep(100);} catch (InterruptedException ignore) {}
+// queueSize = stats.getAsyncQueueSize();
+// queuedMsgs = stats.getAsyncQueuedMsgs();
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+// conflatedMsgs = stats.getAsyncConflatedMsgs();
+// getLogWriter().info("After sleeping"
+// + " queueSize=" + queueSize
+// + " queuedMsgs=" + queuedMsgs
+// + " dequeuedMsgs=" + dequeuedMsgs
+// + " conflatedMsgs=" + conflatedMsgs);
+ final long finish = System.currentTimeMillis();
+ getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The lastValue was " + lastValue);
+
+ checkLastValueInOtherVm(lastValue, null);
+ }
+ /**
+ * make sure ack does not hang
+ * make sure two ack updates do not conflate but are both queued
+ */
+ public void testAckConflation() throws CacheException {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ factory.setEnableAsyncConflation(true);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final Region ar = createAckRegion(false, true);
+ ar.create("ackKey", "ackValue");
+
+ final DMStats stats = getSystem().getDistributionManager().getStats();
+
+ // create receiver in vm0 with queuing enabled
+ Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "2");
+ doCreateOtherVm(p, false);
+
+ forceQueuing(r);
+ {
+ // make sure ack does not hang
+ // make sure two ack updates do not conflate but are both queued
+ long startQueuedMsgs = stats.getAsyncQueuedMsgs();
+ long startConflatedMsgs = stats.getAsyncConflatedMsgs();
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ ar.put("ackKey", "ackValue");
+ }
+ });
+ t.start();
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ ar.put("ackKey", "ackValue");
+ }
+ });
+ t2.start();
+ // give threads a chance to get queued
+ try {Thread.sleep(100);} catch (InterruptedException ignore) {fail("interrupted");}
+ forceQueueFlush();
+ DistributedTestCase.join(t, 2 * 1000, getLogWriter());
+ DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
+ long endQueuedMsgs = stats.getAsyncQueuedMsgs();
+ long endConflatedMsgs = stats.getAsyncConflatedMsgs();
+ assertEquals(startConflatedMsgs, endConflatedMsgs);
+ // queue should be flushed by the time we get an ack
+ assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
+ assertEquals(startQueuedMsgs+2, endQueuedMsgs);
+ }
+ }
+ /**
+ * Make sure that only sequences of updates are conflated
+ * Also checks that sending to a conflating region and non-conflating region
+ * does the correct thing.
+ * Test disabled because it intermittently fails due to race conditions
+ * in test. This has been fixed in congo's tests. See bug 35357.
+ */
+ public void _disabled_testConflationSequence() throws CacheException {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ factory.setEnableAsyncConflation(true);
+ final Region r = createRootRegion("slowrec", factory.create());
+ factory.setEnableAsyncConflation(false);
+ final Region noConflate = createRootRegion("noConflate", factory.create());
+ final DMStats stats = getSystem().getDistributionManager().getStats();
+
+ // create receiver in vm0 with queuing enabled
+ Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "1");
+ doCreateOtherVm(p, false);
+ {
+ VM vm = getOtherVm();
+ vm.invoke(new CacheSerializableRunnable("create noConflate") {
+ public void run2() throws CacheException {
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_NO_ACK);
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ createRootRegion("noConflate", af.create());
+ }
+ });
+ }
+
+ // now make sure update+destroy does not conflate
+ final Object key = "key";
+ getLogWriter().info("[testConflationSequence] about to force queuing");
+ forceQueuing(r);
+
+ int count = 0;
+ String value = "";
+ String lastValue = value;
+ Object mylcb = null;
+ long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+// long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+// long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ int endCount = count+60;
+
+ getLogWriter().info("[testConflationSequence] about to build up queue");
+ long begin = System.currentTimeMillis();
+ while (count < endCount) {
+ value = "count=" + count;
+ lastValue = value;
+ r.create(key, value);
+ count ++;
+ value = "count=" + count;
+ lastValue = value;
+ r.put(key, value);
+ count ++;
+ mylcb = value;
+ r.destroy(key, mylcb);
+ count ++;
+ lastValue = null;
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+ }
+ assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+ forceQueueFlush();
+ checkLastValueInOtherVm(lastValue, mylcb);
+
+ // now make sure create+update+localDestroy does not conflate
+ getLogWriter().info("[testConflationSequence] force queuing create-update-destroy");
+ forceQueuing(r);
+ initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ endCount = count + 40;
+
+ getLogWriter().info("[testConflationSequence] create-update-destroy");
+ begin = System.currentTimeMillis();
+ while (count < endCount) {
+ value = "count=" + count;
+ lastValue = value;
+ r.create(key, value);
+ count++;
+ value = "count=" + count;
+ lastValue = value;
+ r.put(key, value);
+ count ++;
+ r.localDestroy(key);
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+ }
+ assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+ forceQueueFlush();
+ checkLastValueInOtherVm(lastValue, null);
+
+ // now make sure update+invalidate does not conflate
+ getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
+ forceQueuing(r);
+ initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ value = "count=" + count;
+ lastValue = value;
+ r.create(key, value);
+ count++;
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ endCount = count + 40;
+
+ getLogWriter().info("[testConflationSequence] update-invalidate");
+ begin = System.currentTimeMillis();
+ while (count < endCount) {
+ value = "count=" + count;
+ lastValue = value;
+ r.put(key, value);
+ count ++;
+ r.invalidate(key);
+ count ++;
+ lastValue = CHECK_INVALID;
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+ }
+ assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+ forceQueueFlush();
+ getLogWriter().info("[testConflationSequence] assert other vm");
+ checkLastValueInOtherVm(lastValue, null);
+
+ r.destroy(key);
+
+ // now make sure updates to a conflating region are conflated even while
+ // updates to a non-conflating are not.
+ getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
+ forceQueuing(r);
+ final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
+// initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+
+ value = "count=" + count;
+ lastValue = value;
+ long conflatedMsgs = stats.getAsyncConflatedMsgs();
+ long queuedMsgs = stats.getAsyncQueuedMsgs();
+ r.create(key, value);
+ queuedMsgs++;
+ assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+ assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+ r.put(key, value);
+ queuedMsgs++;
+ assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+ assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+ noConflate.create(key, value);
+ queuedMsgs++;
+ assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+ assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+ noConflate.put(key, value);
+ queuedMsgs++;
+ assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+ assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+ count++;
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ endCount = count + 80;
+
+ begin = System.currentTimeMillis();
+ getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
+ + " queuedMsgs=" + stats.getAsyncQueuedMsgs()
+ + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
+ + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
+ + " asyncSocketWrites=" + stats.getAsyncSocketWrites()
+ );
+ while (count < endCount) {
+ // make sure we continue to have a flush in progress
+ assertEquals(1, stats.getAsyncThreads());
+ assertEquals(1, stats.getAsyncQueues());
+ assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
+ // make sure we are not completing any flushing while this loop is in progress
+ assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
+ value = "count=" + count;
+ lastValue = value;
+ r.put(key, value);
+ count ++;
+ // make sure it was conflated and not queued
+ assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+ conflatedMsgs++;
+ assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+ noConflate.put(key, value);
+ // make sure it was queued and not conflated
+ queuedMsgs++;
+ assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+ assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+// dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+ assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+ }
+
+ forceQueueFlush();
+ getLogWriter().info("[testConflationSequence] assert other vm");
+ checkLastValueInOtherVm(lastValue, null);
+ }
+ /**
+ * Make sure that exceeding the queue size limit causes a disconnect.
+ */
+ public void testSizeDisconnect() throws CacheException {
+ final String expected =
+ "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
+ "||java.io.IOException: Broken pipe";
+ final String addExpected =
+ "<ExpectedException action=add>" + expected + "</ExpectedException>";
+ final String removeExpected =
+ "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final DM dm = getSystem().getDistributionManager();
+ final DMStats stats = dm.getStats();
+ // set others before vm0 connects
+ final Set others = dm.getOtherDistributionManagerIds();
+
+ // create receiver in vm0 with queuing enabled
+ Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "5");
+ p.setProperty("async-max-queue-size", "1"); // 1 meg
+ doCreateOtherVm(p, false);
+
+
+ final Object key = "key";
+ final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give us 10 of these 100K msgs before queue full
+ final byte[] value = new byte[VALUE_SIZE];
+ int count = 0;
+ forceQueuing(r);
+ long queuedMsgs = stats.getAsyncQueuedMsgs();
+ long queueSize = stats.getAsyncQueueSize();
+
+ getCache().getLogger().info(addExpected);
+ try {
+ while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
+ r.put(key, value);
+ count ++;
+ if (stats.getAsyncQueueSize() > 0) {
+ queuedMsgs = stats.getAsyncQueuedMsgs();
+ queueSize = stats.getAsyncQueueSize();
+ }
+ if (count > 100) {
+ fail("should have exceeded max-queue-size by now");
+ }
+ }
+ getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
+ // make sure we lost a connection to vm0
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return dm.getOtherDistributionManagerIds().size() <= others.size()
+ && stats.getAsyncQueueSize() == 0;
+ }
+ public String description() {
+ return "waiting for connection loss";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+ }
+ finally {
+ forceQueueFlush();
+ getCache().getLogger().info(removeExpected);
+ }
+ assertEquals(others, dm.getOtherDistributionManagerIds());
+ assertEquals(0, stats.getAsyncQueueSize());
+ }
+ /**
+ * Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
+ * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
+ * in build.xml in the splitbrainNov07 branch. It had been disabled since
+ * June 2006 due to hangs. Some of the tests, like this one, still need
+ * work because the periodically (some quite often) fail.
+ */
+ public void donottestTimeoutDisconnect() throws CacheException {
+ final String expected =
+ "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
+ "||java.io.IOException: Broken pipe";
+ final String addExpected =
+ "<ExpectedException action=add>" + expected + "</ExpectedException>";
+ final String removeExpected =
+ "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final DM dm = getSystem().getDistributionManager();
+ final DMStats stats = dm.getStats();
+ // set others before vm0 connects
+ final Set others = dm.getOtherDistributionManagerIds();
+
+ // create receiver in vm0 with queuing enabled
+ Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "5");
+ p.setProperty("async-queue-timeout", "500"); // 500 ms
+ doCreateOtherVm(p, true);
+
+
+ final Object key = "key";
+ final int VALUE_SIZE = 1024; // 1k
+ final byte[] value = new byte[VALUE_SIZE];
+ int count = 0;
+ long queuedMsgs = stats.getAsyncQueuedMsgs();
+ long queueSize = stats.getAsyncQueueSize();
+ final long timeoutLimit = System.currentTimeMillis() + 5000;
+
+ getCache().getLogger().info(addExpected);
+ try {
+ while (stats.getAsyncQueueTimeouts() == 0) {
+ r.put(key, value);
+ count ++;
+ if (stats.getAsyncQueueSize() > 0) {
+ queuedMsgs = stats.getAsyncQueuedMsgs();
+ queueSize = stats.getAsyncQueueSize();
+ }
+ if (System.currentTimeMillis() > timeoutLimit) {
+ fail("should have exceeded async-queue-timeout by now");
+ }
+ }
+ getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
+ // make sure we lost a connection to vm0
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ if (dm.getOtherDistributionManagerIds().size() > others.size()) {
+ return false;
+ }
+ return stats.getAsyncQueueSize() == 0;
+ }
+ public String description() {
+ return "waiting for departure";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+ }
+ finally {
+ getCache().getLogger().info(removeExpected);
+ }
+ assertEquals(others, dm.getOtherDistributionManagerIds());
+ assertEquals(0, stats.getAsyncQueueSize());
+ }
+
+ // static helper methods ---------------------------------------------------
+
+ private static final String KEY_SLEEP = "KEY_SLEEP";
+ private static final String KEY_WAIT = "KEY_WAIT";
+ private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
+
+ protected final static int CALLBACK_CREATE = 0;
+ protected final static int CALLBACK_UPDATE = 1;
+ protected final static int CALLBACK_INVALIDATE = 2;
+ protected final static int CALLBACK_DESTROY = 3;
+ protected final static int CALLBACK_REGION_INVALIDATE = 4;
+
+ protected final static Integer CALLBACK_CREATE_INTEGER = new Integer(CALLBACK_CREATE);
+ protected final static Integer CALLBACK_UPDATE_INTEGER = new Integer(CALLBACK_UPDATE);
+ protected final static Integer CALLBACK_INVALIDATE_INTEGER = new Integer(CALLBACK_INVALIDATE);
+ protected final static Integer CALLBACK_DESTROY_INTEGER = new Integer(CALLBACK_DESTROY);
+ protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new Integer(CALLBACK_REGION_INVALIDATE);
+
+ private static class CallbackWrapper {
+ public final Object callbackArgument;
+ public final int callbackType;
+ public CallbackWrapper(Object callbackArgument, int callbackType) {
+ this.callbackArgument = callbackArgument;
+ this.callbackType = callbackType;
+ }
+ public String toString() {
+ return "CallbackWrapper: " + callbackArgument.toString() + " of type " + callbackType;
+ }
+ }
+
+ protected static class ControlListener extends CacheListenerAdapter {
+ public final LinkedList callbackArguments = new LinkedList();
+ public final LinkedList callbackTypes = new LinkedList();
+ public final Object CONTROL_LOCK = new Object();
+
+ public void afterCreate(EntryEvent event) {
+ getLogWriter().info(event.getRegion().getName() + " afterCreate " + event.getKey());
+ synchronized(this.CONTROL_LOCK) {
+ if (event.getCallbackArgument() != null) {
+ this.callbackArguments.add(
+ new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
+ this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ processEvent(event);
+ }
+ public void afterUpdate(EntryEvent event) {
+ getLogWriter().info(event.getRegion().getName() + " afterUpdate " + event.getKey());
+ synchronized(this.CONTROL_LOCK) {
+ if (event.getCallbackArgument() != null) {
+ this.callbackArguments.add(
+ new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
+ this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ processEvent(event);
+ }
+ public void afterInvalidate(EntryEvent event) {
+ synchronized(this.CONTROL_LOCK) {
+ if (event.getCallbackArgument() != null) {
+ this.callbackArguments.add(
+ new CallbackWrapper(event.getCallbackArgument(), CALLBACK_INVALIDATE));
+ this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ }
+ public void afterDestroy(EntryEvent event) {
+ synchronized(this.CONTROL_LOCK) {
+ if (event.getCallbackArgument() != null) {
+ this.callbackArguments.add(
+ new CallbackWrapper(event.getCallbackArgument(), CALLBACK_DESTROY));
+ this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ }
+ public void afterRegionInvalidate(RegionEvent event) {
+ synchronized(this.CONTROL_LOCK) {
+ if (event.getCallbackArgument() != null) {
+ this.callbackArguments.add(
+ new CallbackWrapper(event.getCallbackArgument(), CALLBACK_REGION_INVALIDATE));
+ this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ }
+ private void processEvent(EntryEvent event) {
+ if (event.getKey().equals(KEY_SLEEP)) {
+ processSleep(event);
+ }
+ else if (event.getKey().equals(KEY_WAIT)) {
+ processWait(event);
+ }
+ else if (event.getKey().equals(KEY_DISCONNECT)) {
+ processDisconnect(event);
+ }
+ }
+ private void processSleep(EntryEvent event) {
+ int sleepMs = ((Integer)event.getNewValue()).intValue();
+ getLogWriter().info("[processSleep] sleeping for " + sleepMs);
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ private void processWait(EntryEvent event) {
+ int sleepMs = ((Integer)event.getNewValue()).intValue();
+ getLogWriter().info("[processWait] waiting for " + sleepMs);
+ synchronized(this.CONTROL_LOCK) {
+ try {
+ this.CONTROL_LOCK.wait(sleepMs);
+ } catch (InterruptedException ignore) {return;}
+ }
+ }
+ private void processDisconnect(EntryEvent event) {
+ getLogWriter().info("[processDisconnect] disconnecting");
+ disconnectFromDS();
+ }
+ };
+
+ /**
+ * Make sure a multiple no ack regions conflate properly.
+ * [bruce] disabled when use of this dunit test class was reenabled in
+ * the splitbrainNov07 branch. The class had been disabled since
+ * June 2006 r13222 in the trunk. This test is failing because conflation
+ * isn't kicking in for some reason.
+ */
+ public void donottestMultipleRegionConflation() throws Throwable {
+ try {
+ doTestMultipleRegionConflation();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ getLogWriter().error("Encountered exception: ", t);
+ throw t;
+ }
+ finally {
+ // make sure other vm was notified even if test failed
+ getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+ public void run() {
+ synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+ doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
+ }
+ }
+ });
+ }
+ }
+ protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
+ protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
+ private void doTestMultipleRegionConflation() throws Exception {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ factory.setEnableAsyncConflation(true);
+ final Region r1 = createRootRegion("slowrec1", factory.create());
+ final Region r2 = createRootRegion("slowrec2", factory.create());
+
+ assertTrue(getSystem().isConnected());
+ assertNotNull(r1);
+ assertFalse(r1.isDestroyed());
+ assertNotNull(getCache());
+ assertNotNull(getCache().getRegion("slowrec1"));
+ assertNotNull(r2);
+ assertFalse(r2.isDestroyed());
+ assertNotNull(getCache());
+ assertNotNull(getCache().getRegion("slowrec2"));
+
+ final DM dm = getSystem().getDistributionManager();
+ final Serializable controllerVM = dm.getDistributionManagerId();
+ final DMStats stats = dm.getStats();
+ final int millisToWait = 1000 * 60 * 5; // 5 minutes
+
+ // set others before vm0 connects
+ long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+
+ // create receiver in vm0 with queuing enabled
+ final Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "5");
+ p.setProperty("async-queue-timeout", "86400000"); // max value
+ p.setProperty("async-max-queue-size", "1024"); // max value
+
+ getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+ public void run2() throws CacheException {
+ getSystem(p);
+
+ DM dm = getSystem().getDistributionManager();
+ assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
+
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_NO_ACK);
+ af.setDataPolicy(DataPolicy.REPLICATE);
+
+ doTestMultipleRegionConflation_R1_Listener = new ControlListener();
+ af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
+ createRootRegion("slowrec1", af.create());
+
+ doTestMultipleRegionConflation_R2_Listener = new ControlListener();
+ af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
+ createRootRegion("slowrec2", af.create());
+ }
+ });
+
+ // put vm0 cache listener into wait
+ getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 into wait");
+ r1.put(KEY_WAIT, new Integer(millisToWait));
+
+ // build up queue size
+ getLogWriter().info("[doTestMultipleRegionConflation] building up queue size...");
+ final Object key = "key";
+ final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+ final int VALUE_SIZE = socketBufferSize*3;
+ //final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
+ final byte[] value = new byte[VALUE_SIZE];
+
+ int count = 0;
+ while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+ count++;
+ r1.put(key, value);
+ }
+
+ getLogWriter().info("[doTestMultipleRegionConflation] After " +
+ count + " puts of size " + VALUE_SIZE +
+ " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+ // put values that will be asserted
+ final Object key1 = "key1";
+ final Object key2 = "key2";
+ Object putKey = key1;
+ boolean flag = true;
+ for (int i = 0; i < 30; i++) {
+ if (i == 10) putKey = key2;
+ if (flag) {
+ if (i == 6) {
+ r1.invalidate(putKey, new Integer(i));
+ } else if (i == 24) {
+ r1.invalidateRegion(new Integer(i));
+ } else {
+ r1.put(putKey, value, new Integer(i));
+ }
+ } else {
+ if (i == 15) {
+ r2.destroy(putKey, new Integer(i));
+ } else {
+ r2.put(putKey, value, new Integer(i));
+ }
+ }
+ flag = !flag;
+ }
+
+ // r1: key1, 0, create
+ // r1: key1, 4, update
+ // r1: key1, 6, invalidate
+ // r1: key1, 8, update
+
+ // r1: key2, 10, create
+ // r1: 24, invalidateRegion
+ // r1: key2, 28, update
+
+ // r2: key1, 1, create
+ // r2: key1, 9, update
+
+ // r2: key2, 11, create
+ // r2: key2, 13, update
+ // r2: key2, 15, destroy
+ // r2: key2, 17, create
+ // r2: key2, 29, update
+
+ final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 };
+ final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
+ { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
+ CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE };
+
+ final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
+ final int[] r2ExpectedTypes = new int[]
+ { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
+ CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE };
+
+ // send notify to vm0
+ getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
+ getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+ public void run() {
+ synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+ doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
+ }
+ }
+ });
+
+ // wait for queue to be flushed
+ getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
+ getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
+ public void run() {
+ try {
+ synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+ while (doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < r1ExpectedArgs.length) {
+ doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
+ }
+ }
+ synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
+ while (doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < r2ExpectedArgs.length) {
+ doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
+ }
+ }
+ } catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ });
+
+ // assert values on both listeners
+ getLogWriter().info("[doTestMultipleRegionConflation] assert callback arguments");
+ getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
+ public void run() {
+ synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+ getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments=" + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
+ getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" + doTestMultipleRegionConflation_R1_Listener.callbackTypes);
+ assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
+ doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
+ int i = 0;
+ for (Iterator iter = doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); iter.hasNext();) {
+ CallbackWrapper wrapper = (CallbackWrapper) iter.next();
+ assertEquals(new Integer(r1ExpectedArgs[i]),
+ wrapper.callbackArgument);
+ assertEquals(new Integer(r1ExpectedTypes[i]),
+ doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
+ i++;
+ }
+ }
+ synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
+ getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments=" + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
+ getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" + doTestMultipleRegionConflation_R2_Listener.callbackTypes);
+ assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
+ doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
+ int i = 0;
+ for (Iterator iter = doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); iter.hasNext();) {
+ CallbackWrapper wrapper = (CallbackWrapper) iter.next();
+ assertEquals(new Integer(r2ExpectedArgs[i]),
+ wrapper.callbackArgument);
+ assertEquals(new Integer(r2ExpectedTypes[i]),
+ doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
+ i++;
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Make sure a disconnect causes queue memory to be released.
+ */
+ public void testDisconnectCleanup() throws Throwable {
+ try {
+ doTestDisconnectCleanup();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ getLogWriter().error("Encountered exception: ", t);
+ throw t;
+ }
+ finally {
+ // make sure other vm was notified even if test failed
+ getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+ public void run() {
+ synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
+ doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
+ }
+ }
+ });
+ }
+ }
+ protected static ControlListener doTestDisconnectCleanup_Listener;
+ private void doTestDisconnectCleanup() throws Exception {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final DM dm = getSystem().getDistributionManager();
+ final DMStats stats = dm.getStats();
+ // set others before vm0 connects
+ final Set others = dm.getOtherDistributionManagerIds();
+ long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+ final int initialQueues = stats.getAsyncQueues();
+
+ // create receiver in vm0 with queuing enabled
+ final Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", "5");
+ p.setProperty("async-queue-timeout", "86400000"); // max value
+ p.setProperty("async-max-queue-size", "1024"); // max value
+
+ getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+ public void run2() throws CacheException {
+ getSystem(p);
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_NO_ACK);
+ af.setDataPolicy(DataPolicy.REPLICATE);
+
+ doTestDisconnectCleanup_Listener = new ControlListener();
+ af.setCacheListener(doTestDisconnectCleanup_Listener);
+ createRootRegion("slowrec", af.create());
+ }
+ });
+
+ // put vm0 cache listener into wait
+ getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
+ int millisToWait = 1000 * 60 * 5; // 5 minutes
+ r.put(KEY_WAIT, new Integer(millisToWait));
+ r.put(KEY_DISCONNECT, KEY_DISCONNECT);
+
+ // build up queue size
+ getLogWriter().info("[testDisconnectCleanup] building up queue size...");
+ final Object key = "key";
+ final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+ final int VALUE_SIZE = socketBufferSize*3;
+ //final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
+ final byte[] value = new byte[VALUE_SIZE];
+
+ int count = 0;
+ final long abortMillis = System.currentTimeMillis() + millisToWait;
+ while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+ count++;
+ r.put(key, value);
+ assertFalse(System.currentTimeMillis() >= abortMillis);
+ }
+
+ getLogWriter().info("[testDisconnectCleanup] After " +
+ count + " puts of size " + VALUE_SIZE +
+ " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+ while (stats.getAsyncQueuedMsgs() < 10 ||
+ stats.getAsyncQueueSize() < VALUE_SIZE*10) {
+ count++;
+ r.put(key, value);
+ assertFalse(System.currentTimeMillis() >= abortMillis);
+ }
+ assertTrue(stats.getAsyncQueuedMsgs() >= 10);
+
+ while (stats.getAsyncQueues() < 1) {
+ pause(100);
+ assertFalse(System.currentTimeMillis() >= abortMillis);
+ }
+
+ getLogWriter().info("[testDisconnectCleanup] After " +
+ count + " puts of size " + VALUE_SIZE + " queue size has reached " +
+ stats.getAsyncQueueSize() + " bytes and number of queues is " +
+ stats.getAsyncQueues() + ".");
+
+ assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
+ assertEquals(initialQueues+1, stats.getAsyncQueues());
+
+ // assert vm0 is still connected
+ assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
+
+ // send notify to vm0
+ getLogWriter().info("[testDisconnectCleanup] wake up vm0");
+ getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+ public void run() {
+ synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
+ doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
+ }
+ }
+ });
+
+ // make sure we lost a connection to vm0
+ getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return dm.getOtherDistributionManagerIds().size() <= others.size();
+ }
+ public String description() {
+ return "waiting for disconnect";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+ assertEquals(others, dm.getOtherDistributionManagerIds());
+
+ // check free memory... perform wait loop with System.gc
+ getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
+ ev = new WaitCriterion() {
+ public boolean done() {
+ if (stats.getAsyncQueues() <= initialQueues) {
+ return true;
+ }
+ Runtime.getRuntime().gc();
+ return false;
+ }
+ public String description() {
+ return "waiting for queue cleanup";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+// getLogWriter().info("[testDisconnectCleanup] initialQueues=" +
+// initialQueues + " asyncQueues=" + stats.getAsyncQueues());
+ assertEquals(initialQueues, stats.getAsyncQueues());
+ }
+
+ /**
+ * Make sure a disconnect causes queue memory to be released.<p>
+ * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
+ * in build.xml in the splitbrainNov07 branch. It had been disabled since
+ * June 2006 due to hangs. Some of the tests, like this one, still need
+ * work because the periodically (some quite often) fail.
+ */
+ public void donottestPartialMessage() throws Throwable {
+ try {
+ doTestPartialMessage();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable t) {
+ getLogWriter().error("Encountered exception: ", t);
+ throw t;
+ }
+ finally {
+ // make sure other vm was notified even if test failed
+ getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+ public void run() {
+ synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+ doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
+ }
+ }
+ });
+ }
+ }
+ protected static ControlListener doTestPartialMessage_Listener;
+ private void doTestPartialMessage() throws Exception {
+ final AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+ factory.setEnableAsyncConflation(true);
+ final Region r = createRootRegion("slowrec", factory.create());
+ final DM dm = getSystem().getDistributionManager();
+ final DMStats stats = dm.getStats();
+
+ // set others before vm0 connects
+// final Set others = dm.getOtherDistributionManagerIds();
+ long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+// int initialQueues = stats.getAsyncQueues();
+
+ // create receiver in vm0 with queuing enabled
+ final Properties p = new Properties();
+ p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4 sec
+ p.setProperty("async-queue-timeout", "86400000"); // max value
+ p.setProperty("async-max-queue-size", "1024"); // max value
+
+ getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+ public void run2() throws CacheException {
+ getSystem(p);
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_NO_ACK);
+ af.setDataPolicy(DataPolicy.REPLICATE);
+
+ doTestPartialMessage_Listener = new ControlListener();
+ af.setCacheListener(doTestPartialMessage_Listener);
+ createRootRegion("slowrec", af.create());
+ }
+ });
+
+ // put vm0 cache listener into wait
+ getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
+ final int millisToWait = 1000 * 60 * 5; // 5 minutes
+ r.put(KEY_WAIT, new Integer(millisToWait));
+
+ // build up queue size
+ getLogWriter().info("[testPartialMessage] building up queue size...");
+ final Object key = "key";
+ final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+ final int VALUE_SIZE = socketBufferSize*3;
+ //1024 * 20; // 20 KB
+ final byte[] value = new byte[VALUE_SIZE];
+
+ int count = 0;
+ while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+ count++;
+ r.put(key, value, new Integer(count));
+ }
+
+ final int partialId = count;
+ assertEquals(0, stats.getAsyncConflatedMsgs());
+
+ getLogWriter().info("[testPartialMessage] After " +
+ count + " puts of size " + VALUE_SIZE +
+ " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+ pause(2000);
+
+ // conflate 10 times
+ while (stats.getAsyncConflatedMsgs() < 10) {
+ count++;
+ r.put(key, value, new Integer(count));
+ if (count == partialId+1) {
+// long begin = System.currentTimeMillis();
+// while (stats.getAsyncQueues() < 1) {
+// pause(100);
+// assertFalse(System.currentTimeMillis() > begin+1000*10);
+// }
+ assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
+ assertEquals(0, stats.getAsyncConflatedMsgs());
+ } else if (count == partialId+2) {
+ assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
+ assertEquals(1, stats.getAsyncConflatedMsgs());
+ }
+ }
+
+ final int conflateId = count;
+
+ final int[] expectedArgs = { partialId, conflateId };
+
+ // send notify to vm0
+ getLogWriter().info("[testPartialMessage] wake up vm0");
+ getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+ public void run() {
+ synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+ doTestPartialMessage_Listener.CONTROL_LOCK.notify();
+ }
+ }
+ });
+
+ // wait for queue to be flushed
+ getLogWriter().info("[testPartialMessage] wait for vm0");
+ getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
+ public void run() {
+ try {
+ synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+ boolean done = false;
+ while (!done) {
+ if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
+ CallbackWrapper last = (CallbackWrapper)
+ doTestPartialMessage_Listener.callbackArguments.getLast();
+ Integer lastId = (Integer) last.callbackArgument;
+ if (lastId.intValue() == conflateId) {
+ done = true;
+ } else {
+ doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
+ }
+ } else {
+ doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
+ }
+ }
+ }
+ } catch (InterruptedException ignore) {fail("interrupted");}
+ }
+ });
+
+ // assert values on both listeners
+ getLogWriter().info("[testPartialMessage] assert callback arguments");
+ getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
+ public void run() {
+ synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+ getLogWriter().info("[testPartialMessage] " +
+ "doTestPartialMessage_Listener.callbackArguments=" +
+ doTestPartialMessage_Listener.callbackArguments);
+
+ assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
+ doTestPartialMessage_Listener.callbackTypes.size());
+
+ int i = 0;
+ Iterator argIter =
+ doTestPartialMessage_Listener.callbackArguments.iterator();
+ Iterator typeIter =
+ doTestPartialMessage_Listener.callbackTypes.iterator();
+
+ while (argIter.hasNext()) {
+ CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
+ Integer arg = (Integer) wrapper.callbackArgument;
+ typeIter.next(); // Integer type
+ if (arg.intValue() < partialId) {
+ continue;
+ }
+ assertEquals(new Integer(expectedArgs[i]), arg);
+ //assertEquals(CALLBACK_UPDATE_INTEGER, type);
+ i++;
+ }
+ }
+ }
+ });
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
index 41b3042..55a63ff 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
@@ -24,7 +24,11 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
+import org.junit.experimental.categories.Category;
+
import com.gemstone.gemfire.distributed.internal.membership.*;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
import junit.framework.AssertionFailedError;
/**
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java
new file mode 100755
index 0000000..e07e9fc
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.distributed;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class KirkDUnitTest {
+
+ @Test
+ public void testMe() {
+ Assert.assertTrue(true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java
new file mode 100755
index 0000000..8e267db
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ //DistributedMemberDUnitTest.class,
+ KirkDUnitTest.class,
+})
+/**
+ * Suite of tests for distributed membership dunit tests.
+ */
+public class KirkDistributedTestSuite {
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java
new file mode 100755
index 0000000..f39fd05
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ LauncherMemberMXBeanJUnitTest.class,
+})
+/**
+ * @author Kirk Lund
+ */
+public class KirkIntegrationTestSuite {
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java
deleted file mode 100644
index 67de3e3..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.tcpserver;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Properties;
-import java.util.Vector;
-
-import junit.framework.Assert;
-
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.distributed.Locator;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.Version;
-
-import dunit.DistributedTestCase;
-import dunit.Host;
-import dunit.VM;
-
-/**
- * This tests the rolling upgrade for locators with
- * different GOSSIPVERSION.
- *
- * @author shobhit
- *
- */
-public class TcpServerBackwardCompatDUnitDisabledTest extends DistributedTestCase {
-
- /**
- * @param name
- */
- public TcpServerBackwardCompatDUnitDisabledTest(String name) {
- super(name);
- }
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- disconnectAllFromDS();
- invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
-
- @Override
- public void run2() throws CacheException {
- TcpServer.isTesting = true;
- }
- });
- }
-
- @Override
- public void tearDown2() throws Exception {
- invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
-
- @Override
- public void run2() throws CacheException {
- TcpServer.isTesting = false;
- }
- });
- super.tearDown2();
- }
-
- /**
- * This test starts two locators with current GOSSIPVERSION
- * and then shuts down one of them and restart it with new
- * GOSSIPVERSION and verifies that it has recoverd the system
- * View. Then we upgrade next locator.
- */
- public void testGossipVersionBackwardCompatibility() {
- Host host = Host.getHost(0);
- final VM locator0 = host.getVM(0);
- final VM locator1 = host.getVM(1);
- final VM locatorRestart0 = host.getVM(2);
- final VM member = host.getVM(3);
-
- // Create properties for locator0
- final int port0 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- final File logFile0 = new File(getUniqueName() + "-locator" + port0 + ".log");
-
- // Create properties for locator1
- int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- while (port == port0) {
- port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- }
- final int port1 = port;
-
- final File logFile1 = new File(getUniqueName() + "-locator" + port1 + ".log");
-
- final String locators = host.getHostName() + "[" + port0 + "]," +
- host.getHostName() + "[" + port1 + "]";
-
- final Properties props = new Properties();
- props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-
- // Start locator0 with props.
- //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
- locator0.invoke(new CacheSerializableRunnable("Starting first locator on port " + port0) {
-
- @Override
- public void run2() throws CacheException {
- try {
- TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
-
- Locator.startLocatorAndDS(port0, logFile0, props);
- } catch (IOException e) {
- fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
- }
- }
- });
-
- // Start a new member to add it to discovery set of locator0.
- member.invoke(new CacheSerializableRunnable("Start a member") {
-
- @Override
- public void run2() throws CacheException {
- disconnectFromDS();
- TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
- InternalDistributedSystem.connect(props);
- }
- });
-
- // Start locator1 with props.
- //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port1+"]");
- locator1.invoke(new CacheSerializableRunnable("Starting second locator on port " + port1) {
-
- @Override
- public void run2() throws CacheException {
- try {
- TcpServer.TESTVERSION -= 100;
- TcpServer.OLDTESTVERSION -= 100;
- TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
- TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
- assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
- assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
-
- Locator.startLocatorAndDS(port1, logFile1, props);
-
- // Start a gossip client to connect to first locator "locator0".
- fail("this test must be fixed to work with the jgroups replacement");
- // TODO
-// final GossipClient client = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1), 500);
-// client.register("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), 5000, false);
-
- WaitCriterion ev = new WaitCriterion() {
- public boolean done() {
- try {
- // TODO
-// Vector members = client.getMembers("mygroup1",
-// new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
-// return members.size() == 2;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("unexpected exception");
- }
- return false; // NOTREACHED
- }
- public String description() {
- return null;
- }
- };
-
- DistributedTestCase.waitForCriterion(ev, 1000, 200, true);
- fail("this test must be fixed to work with the jgroups replacement");
- // TODO
-// Vector members = client.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
-// Assert.assertEquals(2, members.size());
-// Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port0)));
-// Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port1)));
-
- } catch (IOException e) {
- fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
- }
- }
- });
-
- // Stop first locator currently running in locator0 VM.
- locator0.invoke(new CacheSerializableRunnable("Stopping first locator") {
-
- @Override
- public void run2() throws CacheException {
- Locator.getLocator().stop();
- disconnectFromDS();
- }
- });
-
- // Restart first locator in new VM.
- //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
- locatorRestart0.invoke(new CacheSerializableRunnable("Restarting first locator on port " + port0) {
-
- @Override
- public void run2() throws CacheException {
- try {
- TcpServer.TESTVERSION -= 100;
- TcpServer.OLDTESTVERSION -= 100;
- TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
- TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
- assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
- assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
-
- Locator.startLocatorAndDS(port0, logFile0, props);
-
- // A new gossip client with new GOSSIPVERSION must be able
- // to connect with new locator on port1, remote locator.
- // Reuse locator0 VM.
- fail("this test must be fixed to work with the jgroups replacement");
- // TODO
-// final GossipClient client2 = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1), 500);
-// Vector<IpAddress> members = client2.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), true, 5000);
-// Assert.assertEquals(2, members.size());
- // As they are coming from other locator, their pid is of other locator process.
-// getLogWriter().info(members.get(0) + " " + members.get(1));
-
- // TODO
-// for (IpAddress ipAddr : members) {
-// int port = ipAddr.getPort();
-// String hostname = ipAddr.getIpAddress().getHostAddress();
-// int pid = ipAddr.getProcessId();
-// Assert.assertTrue(" " + ipAddr, port == port0 || port == port1);
-// Assert.assertTrue(" " + ipAddr, hostname.equals(InetAddress.getLocalHost().getHostAddress()));
-// Assert.assertTrue(" " + ipAddr, pid == locator1.getPid());
-// }
-
- } catch (IOException e) {
- fail("Locator0 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
- }
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java
new file mode 100644
index 0000000..2f5b80b
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.tcpserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Properties;
+import java.util.Vector;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import junit.framework.Assert;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * This tests the rolling upgrade for locators with
+ * different GOSSIPVERSION.
+ *
+ * @author shobhit
+ *
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class TcpServerBackwardCompatDUnitTest extends DistributedTestCase {
+
+ /**
+ * @param name
+ */
+ public TcpServerBackwardCompatDUnitTest(String name) {
+ super(name);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ disconnectAllFromDS();
+ invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
+
+ @Override
+ public void run2() throws CacheException {
+ TcpServer.isTesting = true;
+ }
+ });
+ }
+
+ @Override
+ public void tearDown2() throws Exception {
+ invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
+
+ @Override
+ public void run2() throws CacheException {
+ TcpServer.isTesting = false;
+ }
+ });
+ super.tearDown2();
+ }
+
+ /**
+ * This test starts two locators with current GOSSIPVERSION
+ * and then shuts down one of them and restart it with new
+ * GOSSIPVERSION and verifies that it has recoverd the system
+ * View. Then we upgrade next locator.
+ */
+ public void testGossipVersionBackwardCompatibility() {
+ Host host = Host.getHost(0);
+ final VM locator0 = host.getVM(0);
+ final VM locator1 = host.getVM(1);
+ final VM locatorRestart0 = host.getVM(2);
+ final VM member = host.getVM(3);
+
+ // Create properties for locator0
+ final int port0 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ final File logFile0 = new File(getUniqueName() + "-locator" + port0 + ".log");
+
+ // Create properties for locator1
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ while (port == port0) {
+ port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ }
+ final int port1 = port;
+
+ final File logFile1 = new File(getUniqueName() + "-locator" + port1 + ".log");
+
+ final String locators = host.getHostName() + "[" + port0 + "]," +
+ host.getHostName() + "[" + port1 + "]";
+
+ final Properties props = new Properties();
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+
+ // Start locator0 with props.
+ //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
+ locator0.invoke(new CacheSerializableRunnable("Starting first locator on port " + port0) {
+
+ @Override
+ public void run2() throws CacheException {
+ try {
+ TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
+
+ Locator.startLocatorAndDS(port0, logFile0, props);
+ } catch (IOException e) {
+ fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
+ }
+ }
+ });
+
+ // Start a new member to add it to discovery set of locator0.
+ member.invoke(new CacheSerializableRunnable("Start a member") {
+
+ @Override
+ public void run2() throws CacheException {
+ disconnectFromDS();
+ TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
+ InternalDistributedSystem.connect(props);
+ }
+ });
+
+ // Start locator1 with props.
+ //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port1+"]");
+ locator1.invoke(new CacheSerializableRunnable("Starting second locator on port " + port1) {
+
+ @Override
+ public void run2() throws CacheException {
+ try {
+ TcpServer.TESTVERSION -= 100;
+ TcpServer.OLDTESTVERSION -= 100;
+ TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
+ TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
+ assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
+ assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
+
+ Locator.startLocatorAndDS(port1, logFile1, props);
+
+ // Start a gossip client to connect to first locator "locator0".
+ fail("this test must be fixed to work with the jgroups replacement");
+ // TODO
+// final GossipClient client = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1), 500);
+// client.register("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), 5000, false);
+
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ try {
+ // TODO
+// Vector members = client.getMembers("mygroup1",
+// new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
+// return members.size() == 2;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception");
+ }
+ return false; // NOTREACHED
+ }
+ public String description() {
+ return null;
+ }
+ };
+
+ DistributedTestCase.waitForCriterion(ev, 1000, 200, true);
+ fail("this test must be fixed to work with the jgroups replacement");
+ // TODO
+// Vector members = client.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
+// Assert.assertEquals(2, members.size());
+// Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port0)));
+// Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port1)));
+
+ } catch (IOException e) {
+ fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
+ }
+ }
+ });
+
+ // Stop first locator currently running in locator0 VM.
+ locator0.invoke(new CacheSerializableRunnable("Stopping first locator") {
+
+ @Override
+ public void run2() throws CacheException {
+ Locator.getLocator().stop();
+ disconnectFromDS();
+ }
+ });
+
+ // Restart first locator in new VM.
+ //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
+ locatorRestart0.invoke(new CacheSerializableRunnable("Restarting first locator on port " + port0) {
+
+ @Override
+ public void run2() throws CacheException {
+ try {
+ TcpServer.TESTVERSION -= 100;
+ TcpServer.OLDTESTVERSION -= 100;
+ TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
+ TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
+ assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
+ assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
+
+ Locator.startLocatorAndDS(port0, logFile0, props);
+
+ // A new gossip client with new GOSSIPVERSION must be able
+ // to connect with new locator on port1, remote locator.
+ // Reuse locator0 VM.
+ fail("this test must be fixed to work with the jgroups replacement");
+ // TODO
+// final GossipClient client2 = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1), 500);
+// Vector<IpAddress> members = client2.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), true, 5000);
+// Assert.assertEquals(2, members.size());
+ // As they are coming from other locator, their pid is of other locator process.
+// getLogWriter().info(members.get(0) + " " + members.get(1));
+
+ // TODO
+// for (IpAddress ipAddr : members) {
+// int port = ipAddr.getPort();
+// String hostname = ipAddr.getIpAddress().getHostAddress();
+// int pid = ipAddr.getProcessId();
+// Assert.assertTrue(" " + ipAddr, port == port0 || port == port1);
+// Assert.assertTrue(" " + ipAddr, hostname.equals(InetAddress.getLocalHost().getHostAddress()));
+// Assert.assertTrue(" " + ipAddr, pid == locator1.getPid());
+// }
+
+ } catch (IOException e) {
+ fail("Locator0 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
+ }
+ }
+ });
+ }
+}