You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/01/07 23:00:30 UTC

[1/3] incubator-geode git commit: Remove Disabled from names of tests. Ensure each test has a Category and add Ignore to any test that is disabled due to being broken.

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-714 b417b275b -> 4f0776572


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java
deleted file mode 100755
index ffd5726..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitDisabledTest.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-
-import com.gemstone.gemfire.cache.EvictionAlgorithm;
-import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
-import com.gemstone.gemfire.internal.cache.lru.MemLRUCapacityController;
-
-public class EvictionDUnitDisabledTest extends EvictionTestBase {
-  private static final long serialVersionUID = 270073077723092256L;
-
-  public EvictionDUnitDisabledTest(String name) {
-    super(name);
-  }
- 
-  public void testDummyInlineNCentralizedEviction() {
-    prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
-    putData("PR1", 50, 1);
-    
-    final int expectedEviction1 = getExpectedEvictionRatioOnVm(dataStore1);
-    final int expectedEviction2 = getExpectedEvictionRatioOnVm(dataStore2);
-    
-    raiseFakeNotification(dataStore1, "PR1", expectedEviction1);
-    raiseFakeNotification(dataStore2, "PR1", expectedEviction2);
-    validateNoOfEvictions("PR1", expectedEviction1 + expectedEviction2);
-
-    putData("PR1", 4, 1);
-    validateNoOfEvictions("PR1", 4 + expectedEviction1 + expectedEviction2);
-  }
-  
-  public void testThreadPoolSize() {
-    prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
-    putData("PR1", 50, 1);
-    raiseFakeNotification(dataStore1, "PR1", getExpectedEvictionRatioOnVm(dataStore1));
-    verifyThreadPoolTaskCount(HeapEvictor.MAX_EVICTOR_THREADS);
-  }
-  
-  public void testCentralizedEvictionnForDistributedRegionWithDummyEvent() {
-    prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
-    createDistributedRegion();
-    putDataInDistributedRegion(50, 1);
-    raiseFakeNotification(dataStore1, "DR1", getExpectedEvictionRatioOnVm(dataStore1));
-  }
-
-  /**
-   * Test Case Description: 2 VM's. 2 PR's. 4 buckets each PR. PR1 has action
-   * -Local destroy and PR2 has action - Overflow To Disk.
-   * 
-   * Test Case verifies:If naturally Eviction up and eviction Down events are
-   * raised. Centralized and Inline eviction are happening.All this verificatio
-   * is done thorugh logs. It also verifies that during eviction, if one node
-   * goes down and then comes up again causing GII to take place, the system
-   * doesnot throw an OOME.
-   */
-  public void testEvictionWithNodeDown() {
-    prepareScenario2(EvictionAlgorithm.LRU_HEAP, "PR3", "PR4");
-    putDataInDataStore3("PR3", 100, 1);
-    fakeNotification();
-    print("PR3");
-    killVm();
-    bringVMBackToLife();
-    assertEquals(100, getPRSize("PR3"));
-    assertEquals(0, getPRSize("PR4"));
-  }
-  
-  public void testEntryLruEvictions() {
-    int extraEntries=1;
-    createCache();
-    maxEnteries=3;
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
-    
-    final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
-    getLogWriter().info(
-        "PR- " +pr.getEvictionAttributes().getMaximum());
-    
-    for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
-      pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
-    }
-     
-    assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
-  }
-  
-  
-  public void testEntryLru() {
-    createCache();
-    maxEnteries=12;
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
-    
-    final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
-    getLogWriter().info(
-        "PR- " +pr.getEvictionAttributes().getMaximum());
-    for (int i = 0; i < 3; i++) {
-      // assume mod-based hashing for bucket creation
-      pr.put(new Integer(i), "value0");
-      pr.put(new Integer(i
-          + pr.getPartitionAttributes().getTotalNumBuckets()), "value1");
-      pr.put(new Integer(i
-          + (pr.getPartitionAttributes().getTotalNumBuckets()) * 2),
-          "value2");
-    }
-    pr.put(new Integer(3), "value0");
-    
-    for (int i = 0; i < 2; i++) {
-      pr.put(new Integer(i
-          + pr.getPartitionAttributes().getTotalNumBuckets())*3, "value1");
-    }
-   assertEquals(0,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
-  }
-
-  public void testCheckEntryLruEvictionsIn1DataStore() {
-    int extraEntries=10;
-    createCache();
-    maxEnteries=20;
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 5, 1, 1000,maxEnteries);
-    
-    final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
-    getLogWriter().info(
-        "PR- " +pr.getEvictionAttributes().getMaximum());
-    
-    for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
-      pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
-    }
-     
-    assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
-    
-    for (final Iterator i = pr.getDataStore().getAllLocalBuckets().iterator(); i
-        .hasNext();) {
-      final Map.Entry entry = (Map.Entry)i.next();
-      final BucketRegion bucketRegion = (BucketRegion)entry.getValue();
-      if (bucketRegion == null) {
-        continue;
-      }
-      getLogWriter().info(
-          "FINAL bucket= " + bucketRegion.getFullPath() + "size= "
-              + bucketRegion.size() + "  count= "+bucketRegion.entryCount());
-      assertEquals(4,bucketRegion.size());
-    }
-  }
-  
-  public void testCheckEntryLruEvictionsIn2DataStore() {
-    maxEnteries=20;
-    prepareScenario1(EvictionAlgorithm.LRU_ENTRY,maxEnteries);
-    putData("PR1", 60, 1);
-    validateNoOfEvictions("PR1", 20);
-  }
-  
-  
-  public void testMemLruForPRAndDR() {
-    createCache();
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_MEMORY, "PR1", 4, 1, 1000,40);
-    createDistRegionWithMemEvictionAttr();
-    PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
-    DistributedRegion dr = (DistributedRegion)cache.getRegion("DR1");
-    
-    assertEquals(pr.getLocalMaxMemory(), pr.getEvictionAttributes().getMaximum());
-    assertEquals(MemLRUCapacityController.DEFAULT_MAXIMUM_MEGABYTES, dr.getEvictionAttributes().getMaximum());
-   
-   for (int i = 0; i < 41; i++) {
-     pr.put(new Integer(i), new byte[1 * 1024 * 1024]);
-    }
-   
-   assertTrue(1<=((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
-   assertTrue(((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions()<=2);
-   
-   for (int i = 0; i < 11; i++) {
-     dr.put(new Integer(i), new byte[1 * 1024 * 1024]);
-    }
-  
-   assertTrue(1<=((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions());
-   assertTrue(((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions()<=2);
-  }
-  
-  public void testEachTaskSize() {
-    createCache();
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR1", 6, 1,
-        1000, 40);
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR2", 10, 1,
-        1000, 40);
-    createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR3", 15, 1,
-        1000, 40);
-    createDistRegion();
-
-    ArrayList<Integer> taskSetSizes = getTestTaskSetSizes();
-    if (taskSetSizes != null) {
-      for (Integer size : taskSetSizes) {
-        assertEquals(8, size.intValue());
-      }
-    }
-
-    /*
-    final PartitionedRegion pr1 = (PartitionedRegion)cache.getRegion("PR1");
-    final PartitionedRegion pr2 = (PartitionedRegion)cache.getRegion("PR2");
-    final PartitionedRegion pr3 = (PartitionedRegion)cache.getRegion("PR3");
-    final DistributedRegion dr1 = (DistributedRegion)cache.getRegion("DR1");
-    
-    for (int counter = 1; counter <= 18; counter++) {
-      pr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
-    }
-    getLogWriter().info("Size of PR1 before eviction = "+ pr1.size());
-    
-    for (int counter = 1; counter <= 30; counter++) {
-      pr2.put(new Integer(counter), new byte[1 * 1024 * 1024]);
-    }
-    getLogWriter().info("Size of PR2 before eviction = "+ pr2.size());
-    
-    for (int counter = 1; counter <= 45; counter++) {
-      pr3.put(new Integer(counter), new byte[1 * 1024 * 1024]);
-    }
-    getLogWriter().info("Size of PR3 before eviction = "+ pr3.size());
-    
-    for (int counter = 1; counter <= 150; counter++) {
-      dr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
-    }
-    getLogWriter().info("Size of DR1 before eviction = "+ dr1.size());
-    
-    
-    getLogWriter().info("Size of PR1 after eviction = "+ pr1.size());
-    getLogWriter().info("Size of PR2 after eviction = "+ pr2.size());
-    getLogWriter().info("Size of PR3 after eviction = "+ pr3.size());
-    getLogWriter().info("Size of PR4 after eviction = "+ dr1.size());*/
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java
new file mode 100755
index 0000000..33807b7
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/EvictionDUnitTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.EvictionAlgorithm;
+import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
+import com.gemstone.gemfire.internal.cache.lru.MemLRUCapacityController;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class EvictionDUnitTest extends EvictionTestBase {
+  private static final long serialVersionUID = 270073077723092256L;
+
+  public EvictionDUnitTest(String name) {
+    super(name);
+  }
+ 
+  public void testDummyInlineNCentralizedEviction() {
+    prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
+    putData("PR1", 50, 1);
+    
+    final int expectedEviction1 = getExpectedEvictionRatioOnVm(dataStore1);
+    final int expectedEviction2 = getExpectedEvictionRatioOnVm(dataStore2);
+    
+    raiseFakeNotification(dataStore1, "PR1", expectedEviction1);
+    raiseFakeNotification(dataStore2, "PR1", expectedEviction2);
+    validateNoOfEvictions("PR1", expectedEviction1 + expectedEviction2);
+
+    putData("PR1", 4, 1);
+    validateNoOfEvictions("PR1", 4 + expectedEviction1 + expectedEviction2);
+  }
+  
+  public void testThreadPoolSize() {
+    prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
+    putData("PR1", 50, 1);
+    raiseFakeNotification(dataStore1, "PR1", getExpectedEvictionRatioOnVm(dataStore1));
+    verifyThreadPoolTaskCount(HeapEvictor.MAX_EVICTOR_THREADS);
+  }
+  
+  public void testCentralizedEvictionnForDistributedRegionWithDummyEvent() {
+    prepareScenario1(EvictionAlgorithm.LRU_HEAP,0);
+    createDistributedRegion();
+    putDataInDistributedRegion(50, 1);
+    raiseFakeNotification(dataStore1, "DR1", getExpectedEvictionRatioOnVm(dataStore1));
+  }
+
+  /**
+   * Test Case Description: 2 VM's. 2 PR's. 4 buckets each PR. PR1 has action
+   * -Local destroy and PR2 has action - Overflow To Disk.
+   * 
+   * Test Case verifies:If naturally Eviction up and eviction Down events are
+   * raised. Centralized and Inline eviction are happening.All this verificatio
+   * is done thorugh logs. It also verifies that during eviction, if one node
+   * goes down and then comes up again causing GII to take place, the system
+   * doesnot throw an OOME.
+   */
+  public void testEvictionWithNodeDown() {
+    prepareScenario2(EvictionAlgorithm.LRU_HEAP, "PR3", "PR4");
+    putDataInDataStore3("PR3", 100, 1);
+    fakeNotification();
+    print("PR3");
+    killVm();
+    bringVMBackToLife();
+    assertEquals(100, getPRSize("PR3"));
+    assertEquals(0, getPRSize("PR4"));
+  }
+  
+  public void testEntryLruEvictions() {
+    int extraEntries=1;
+    createCache();
+    maxEnteries=3;
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
+    
+    final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+    getLogWriter().info(
+        "PR- " +pr.getEvictionAttributes().getMaximum());
+    
+    for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
+      pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+    }
+     
+    assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+  }
+  
+  
+  public void testEntryLru() {
+    createCache();
+    maxEnteries=12;
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 4, 1, 1000,maxEnteries);
+    
+    final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+    getLogWriter().info(
+        "PR- " +pr.getEvictionAttributes().getMaximum());
+    for (int i = 0; i < 3; i++) {
+      // assume mod-based hashing for bucket creation
+      pr.put(new Integer(i), "value0");
+      pr.put(new Integer(i
+          + pr.getPartitionAttributes().getTotalNumBuckets()), "value1");
+      pr.put(new Integer(i
+          + (pr.getPartitionAttributes().getTotalNumBuckets()) * 2),
+          "value2");
+    }
+    pr.put(new Integer(3), "value0");
+    
+    for (int i = 0; i < 2; i++) {
+      pr.put(new Integer(i
+          + pr.getPartitionAttributes().getTotalNumBuckets())*3, "value1");
+    }
+   assertEquals(0,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+  }
+
+  public void testCheckEntryLruEvictionsIn1DataStore() {
+    int extraEntries=10;
+    createCache();
+    maxEnteries=20;
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_ENTRY, "PR1", 5, 1, 1000,maxEnteries);
+    
+    final PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+    getLogWriter().info(
+        "PR- " +pr.getEvictionAttributes().getMaximum());
+    
+    for (int counter = 1; counter <= maxEnteries+extraEntries; counter++) {
+      pr.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+    }
+     
+    assertEquals(extraEntries,((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+    
+    for (final Iterator i = pr.getDataStore().getAllLocalBuckets().iterator(); i
+        .hasNext();) {
+      final Map.Entry entry = (Map.Entry)i.next();
+      final BucketRegion bucketRegion = (BucketRegion)entry.getValue();
+      if (bucketRegion == null) {
+        continue;
+      }
+      getLogWriter().info(
+          "FINAL bucket= " + bucketRegion.getFullPath() + "size= "
+              + bucketRegion.size() + "  count= "+bucketRegion.entryCount());
+      assertEquals(4,bucketRegion.size());
+    }
+  }
+  
+  public void testCheckEntryLruEvictionsIn2DataStore() {
+    maxEnteries=20;
+    prepareScenario1(EvictionAlgorithm.LRU_ENTRY,maxEnteries);
+    putData("PR1", 60, 1);
+    validateNoOfEvictions("PR1", 20);
+  }
+  
+  
+  public void testMemLruForPRAndDR() {
+    createCache();
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_MEMORY, "PR1", 4, 1, 1000,40);
+    createDistRegionWithMemEvictionAttr();
+    PartitionedRegion pr = (PartitionedRegion)cache.getRegion("PR1");
+    DistributedRegion dr = (DistributedRegion)cache.getRegion("DR1");
+    
+    assertEquals(pr.getLocalMaxMemory(), pr.getEvictionAttributes().getMaximum());
+    assertEquals(MemLRUCapacityController.DEFAULT_MAXIMUM_MEGABYTES, dr.getEvictionAttributes().getMaximum());
+   
+   for (int i = 0; i < 41; i++) {
+     pr.put(new Integer(i), new byte[1 * 1024 * 1024]);
+    }
+   
+   assertTrue(1<=((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions());
+   assertTrue(((AbstractLRURegionMap)pr.entries)._getLruList().stats().getEvictions()<=2);
+   
+   for (int i = 0; i < 11; i++) {
+     dr.put(new Integer(i), new byte[1 * 1024 * 1024]);
+    }
+  
+   assertTrue(1<=((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions());
+   assertTrue(((AbstractLRURegionMap)dr.entries)._getLruList().stats().getEvictions()<=2);
+  }
+  
+  public void testEachTaskSize() {
+    createCache();
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR1", 6, 1,
+        1000, 40);
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR2", 10, 1,
+        1000, 40);
+    createPartitionedRegion(true, EvictionAlgorithm.LRU_HEAP, "PR3", 15, 1,
+        1000, 40);
+    createDistRegion();
+
+    ArrayList<Integer> taskSetSizes = getTestTaskSetSizes();
+    if (taskSetSizes != null) {
+      for (Integer size : taskSetSizes) {
+        assertEquals(8, size.intValue());
+      }
+    }
+
+    /*
+    final PartitionedRegion pr1 = (PartitionedRegion)cache.getRegion("PR1");
+    final PartitionedRegion pr2 = (PartitionedRegion)cache.getRegion("PR2");
+    final PartitionedRegion pr3 = (PartitionedRegion)cache.getRegion("PR3");
+    final DistributedRegion dr1 = (DistributedRegion)cache.getRegion("DR1");
+    
+    for (int counter = 1; counter <= 18; counter++) {
+      pr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+    }
+    getLogWriter().info("Size of PR1 before eviction = "+ pr1.size());
+    
+    for (int counter = 1; counter <= 30; counter++) {
+      pr2.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+    }
+    getLogWriter().info("Size of PR2 before eviction = "+ pr2.size());
+    
+    for (int counter = 1; counter <= 45; counter++) {
+      pr3.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+    }
+    getLogWriter().info("Size of PR3 before eviction = "+ pr3.size());
+    
+    for (int counter = 1; counter <= 150; counter++) {
+      dr1.put(new Integer(counter), new byte[1 * 1024 * 1024]);
+    }
+    getLogWriter().info("Size of DR1 before eviction = "+ dr1.size());
+    
+    
+    getLogWriter().info("Size of PR1 after eviction = "+ pr1.size());
+    getLogWriter().info("Size of PR2 after eviction = "+ pr2.size());
+    getLogWriter().info("Size of PR3 after eviction = "+ pr3.size());
+    getLogWriter().info("Size of PR4 after eviction = "+ dr1.size());*/
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
index 386f8ce..060aea7 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapEvictionDUnitTest.java
@@ -34,7 +34,7 @@ import dunit.VM;
  * Performs eviction dunit tests for off-heap memory.
  * @author rholmes
  */
-public class OffHeapEvictionDUnitTest extends EvictionDUnitDisabledTest {
+public class OffHeapEvictionDUnitTest extends EvictionDUnitTest {
   public OffHeapEvictionDUnitTest(String name) {
     super(name);
   }  

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java
deleted file mode 100644
index 6b957e8..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitDisabledTest.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.ha;
-
-import java.util.Iterator;
-import java.util.Properties;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.internal.PoolImpl;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache30.ClientServerTestCase;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.internal.AvailablePort;
-
-import dunit.DistributedTestCase;
-import dunit.Host;
-import dunit.VM;
-
-/**
- * This is Dunit test for bug 36109. This test has a cache-client having a primary
- * and a secondary cache-server as its endpoint. Primary does some operations
- * and is stopped, the client fails over to secondary and does some operations
- * and it is verified that the 'invalidates' stats at the client is same as the
- * total number of operations done by both primary and secondary. The bug was
- * appearing because invalidate stats was part of Endpoint which used to get
- * closed during fail over , with the failed endpoint getting closed. This bug
- * has been fixed by moving the invalidate stat to be part of our implementation.
- * 
- * @author Dinesh Patel
- * 
- */
-public class StatsBugDUnitDisabledTest extends DistributedTestCase
-{
-  /** primary cache server */
-  VM primary = null;
-
-  /** secondary cache server */
-  VM secondary = null;
-
-  /** the cache client */
-  VM client1 = null;
-
-  /** the cache */
-  private static Cache cache = null;
-
-  /** port for the primary cache server */
-  private static int PORT1;
-
-  /** port for the secondary cache server */
-  private static int PORT2;
-
-  /** name of the test region */
-  private static final String REGION_NAME = "StatsBugDUnitTest_Region";
-
-  /** brige-writer instance( used to get connection proxy handle) */
-  private static PoolImpl pool = null;
-
-  /** total number of cache servers */
-  private static final int TOTAL_SERVERS = 2;
-
-  /** number of puts done by each server */
-  private static final int PUTS_PER_SERVER = 10;
-
-  /** prefix added to the keys of events generated on primary */
-  private static final String primaryPrefix = "primary_";
-
-  /** prefix added to the keys of events generated on secondary */
-  private static final String secondaryPrefix = "secondary_";
-
-  /**
-   * Constructor
-   * 
-   * @param name -
-   *          name for this test instance
-   */
-  public StatsBugDUnitDisabledTest(String name) {
-    super(name);
-  }
-
-  /**
-   * Creates the primary and the secondary cache servers
-   * 
-   * @throws Exception -
-   *           thrown if any problem occurs in initializing the test
-   */
-  public void setUp() throws Exception
-  {
-    disconnectAllFromDS();
-    super.setUp();
-    final Host host = Host.getHost(0);
-    primary = host.getVM(0);
-    secondary = host.getVM(1);
-    client1 = host.getVM(2);
-    PORT1 = ((Integer)primary.invoke(StatsBugDUnitDisabledTest.class,
-        "createServerCache")).intValue();
-    PORT2 = ((Integer)secondary.invoke(StatsBugDUnitDisabledTest.class,
-        "createServerCache")).intValue();
-  }
-
-  /**
-   * Create the cache
-   * 
-   * @param props -
-   *          properties for DS
-   * @return the cache instance
-   * @throws Exception -
-   *           thrown if any problem occurs in cache creation
-   */
-  private Cache createCache(Properties props) throws Exception
-  {
-    DistributedSystem ds = getSystem(props);
-    ds.disconnect();
-    ds = getSystem(props);
-    Cache cache = null;
-    cache = CacheFactory.create(ds);
-    if (cache == null) {
-      throw new Exception("CacheFactory.create() returned null ");
-    }
-    return cache;
-  }
-
-  /**
-   * close the cache instances in server and client during tearDown
-   * 
-   * @throws Exception
-   *           thrown if any problem occurs in closing cache
-   */
-  public void tearDown2() throws Exception
-  {
-    super.tearDown2();
-    // close client
-    client1.invoke(StatsBugDUnitDisabledTest.class, "closeCache");
-
-    // close server
-    primary.invoke(StatsBugDUnitDisabledTest.class, "closeCache");
-    secondary.invoke(StatsBugDUnitDisabledTest.class, "closeCache");
-  }
-
-  /**
-   * This test does the following:<br>
-   * 1)Create and populate the client<br>
-   * 2)Do some operations from the primary cache-server<br>
-   * 3)Stop the primary cache-server<br>
-   * 4)Wait some time to allow client to failover to secondary and do some
-   * operations from secondary<br>
-   * 5)Verify that the invalidates stats at the client accounts for the
-   * operations done by both, primary and secondary.
-   * 
-   * @throws Exception -
-   *           thrown if any problem occurs in test execution
-   */
-  public void testBug36109() throws Exception
-  {
-    getLogWriter().info("testBug36109 : BEGIN");
-    client1.invoke(StatsBugDUnitDisabledTest.class, "createClientCacheForInvalidates", new Object[] {
-        getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2) });
-    client1.invoke(StatsBugDUnitDisabledTest.class, "prepopulateClient");
-    primary.invoke(StatsBugDUnitDisabledTest.class, "doEntryOperations",
-        new Object[] { primaryPrefix });
-    pause(3000);
-    primary.invoke(StatsBugDUnitDisabledTest.class, "stopServer");
-    try {
-      Thread.sleep(5000);
-    }
-    catch (InterruptedException ignore) {
-      fail("interrupted");
-    }
-
-    secondary.invoke(StatsBugDUnitDisabledTest.class, "doEntryOperations",
-        new Object[] { secondaryPrefix });
-    try {
-      Thread.sleep(5000);
-    }
-    catch (InterruptedException ignore) {
-      fail("interrupted");
-    }
-
-    client1.invoke(StatsBugDUnitDisabledTest.class, "verifyNumInvalidates");
-    getLogWriter().info("testBug36109 : END");
-  }
-
-  /**
-   * Creates and starts the cache-server
-   * 
-   * @return - the port on which cache-server is running
-   * @throws Exception -
-   *           thrown if any problem occurs in cache/server creation
-   */
-  public static Integer createServerCache() throws Exception
-  {
-    StatsBugDUnitDisabledTest test = new StatsBugDUnitDisabledTest("temp");
-    Properties props = new Properties();
-    cache = test.createCache(props);
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    factory.setDataPolicy(DataPolicy.REPLICATE);
-
-    RegionAttributes attrs = factory.create();
-
-    cache.createRegion(REGION_NAME, attrs);
-    CacheServer server = cache.addCacheServer();
-    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    server.setPort(port);
-    server.setNotifyBySubscription(false);
-    server.setSocketBufferSize(32768);
-    server.start();
-    getLogWriter().info("Server started at PORT = " + port);
-    return new Integer(port);
-  }
-
-  /**
-   * Initializes the cache client
-   * 
-   * @param port1 -
-   *          port for the primary cache-server
-   * @param port2-port
-   *          for the secondary cache-server
-   * @throws Exception-thrown
-   *           if any problem occurs in initializing the client
-   */
-  public static void createClientCache(String host, Integer port1, Integer port2)
-      throws Exception
-  {
-    StatsBugDUnitDisabledTest test = new StatsBugDUnitDisabledTest("temp");
-    cache = test.createCache(createProperties1());
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
-    RegionAttributes attrs = factory.create();
-    Region region = cache.createRegion(REGION_NAME, attrs);
-    region.registerInterest("ALL_KEYS");
-    getLogWriter().info("Client cache created");
-  }
-
-  /**
-   * Initializes the cache client
-   * 
-   * @param port1 -
-   *          port for the primary cache-server
-   * @param port2-port
-   *          for the secondary cache-server
-   * @throws Exception-thrown
-   *           if any problem occurs in initializing the client
-   */
-  public static void createClientCacheForInvalidates(String host, Integer port1, Integer port2)
-      throws Exception
-  {
-    StatsBugDUnitDisabledTest test = new StatsBugDUnitDisabledTest("temp");
-    cache = test.createCache(createProperties1());
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
-    RegionAttributes attrs = factory.create();
-    Region region = cache.createRegion(REGION_NAME, attrs);
-    region.registerInterest("ALL_KEYS", false, false);
-    getLogWriter().info("Client cache created");
-  }
-  
-  /**
-   * Verify that the invalidates stats at the client accounts for the operations
-   * done by both, primary and secondary.
-   * 
-   */
-  public static void verifyNumInvalidates()
-  {
-    long invalidatesRecordedByStats = pool.getInvalidateCount();
-    getLogWriter().info(
-        "invalidatesRecordedByStats = " + invalidatesRecordedByStats);
-
-    int expectedInvalidates = TOTAL_SERVERS * PUTS_PER_SERVER;
-    getLogWriter().info("expectedInvalidates = " + expectedInvalidates);
-
-    if (invalidatesRecordedByStats != expectedInvalidates) {
-      fail("Invalidates received by client(" + invalidatesRecordedByStats
-          + ") does not match with the number of operations("
-          + expectedInvalidates + ") done at server");
-    }
-  }
-
-  /**
-   * Stops the cache server
-   * 
-   */
-  public static void stopServer()
-  {
-    try {
-      Iterator iter = cache.getCacheServers().iterator();
-      if (iter.hasNext()) {
-        CacheServer server = (CacheServer)iter.next();
-        server.stop();
-      }
-    }
-    catch (Exception e) {
-      fail("failed while stopServer()" + e);
-    }
-  }
-
-  /**
-   * create properties for a loner VM
-   */
-  private static Properties createProperties1()
-  {
-    Properties props = new Properties();
-    props.setProperty("mcast-port", "0");
-    props.setProperty("locators", "");
-    return props;
-  }
-
-
-  /**
-   * Do PUT operations
-   * 
-   * @param keyPrefix -
-   *          string prefix for the keys for all the entries do be done
-   * @throws Exception -
-   *           thrown if any exception occurs in doing PUTs
-   */
-  public static void doEntryOperations(String keyPrefix) throws Exception
-  {
-    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-    for (int i = 0; i < PUTS_PER_SERVER; i++) {
-      r1.put(keyPrefix + i, keyPrefix + "val-" + i);
-    }
-  }
-
-  /**
-   * Prepopulate the client with the entries that will be done by cache-servers
-   * 
-   * @throws Exception
-   */
-  public static void prepopulateClient() throws Exception
-  {
-    doEntryOperations(primaryPrefix);
-    doEntryOperations(secondaryPrefix);
-  }
-
-  /**
-   * Close the cache
-   * 
-   */
-  public static void closeCache()
-  {
-    if (cache != null && !cache.isClosed()) {
-      cache.close();
-      cache.getDistributedSystem().disconnect();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java
new file mode 100644
index 0000000..3d09f80
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ha/StatsBugDUnitTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.ha;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * This is Dunit test for bug 36109. This test has a cache-client having a primary
+ * and a secondary cache-server as its endpoint. Primary does some operations
+ * and is stopped, the client fails over to secondary and does some operations
+ * and it is verified that the 'invalidates' stats at the client is same as the
+ * total number of operations done by both primary and secondary. The bug was
+ * appearing because invalidate stats was part of Endpoint which used to get
+ * closed during fail over , with the failed endpoint getting closed. This bug
+ * has been fixed by moving the invalidate stat to be part of our implementation.
+ * 
+ * @author Dinesh Patel
+ * 
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class StatsBugDUnitTest extends DistributedTestCase
+{
+  /** primary cache server */
+  VM primary = null;
+
+  /** secondary cache server */
+  VM secondary = null;
+
+  /** the cache client */
+  VM client1 = null;
+
+  /** the cache */
+  private static Cache cache = null;
+
+  /** port for the primary cache server */
+  private static int PORT1;
+
+  /** port for the secondary cache server */
+  private static int PORT2;
+
+  /** name of the test region */
+  private static final String REGION_NAME = "StatsBugDUnitTest_Region";
+
+  /** brige-writer instance( used to get connection proxy handle) */
+  private static PoolImpl pool = null;
+
+  /** total number of cache servers */
+  private static final int TOTAL_SERVERS = 2;
+
+  /** number of puts done by each server */
+  private static final int PUTS_PER_SERVER = 10;
+
+  /** prefix added to the keys of events generated on primary */
+  private static final String primaryPrefix = "primary_";
+
+  /** prefix added to the keys of events generated on secondary */
+  private static final String secondaryPrefix = "secondary_";
+
+  /**
+   * Constructor
+   * 
+   * @param name -
+   *          name for this test instance
+   */
+  public StatsBugDUnitTest(String name) {
+    super(name);
+  }
+
+  /**
+   * Creates the primary and the secondary cache servers
+   * 
+   * @throws Exception -
+   *           thrown if any problem occurs in initializing the test
+   */
+  public void setUp() throws Exception
+  {
+    disconnectAllFromDS();
+    super.setUp();
+    final Host host = Host.getHost(0);
+    primary = host.getVM(0);
+    secondary = host.getVM(1);
+    client1 = host.getVM(2);
+    PORT1 = ((Integer)primary.invoke(StatsBugDUnitTest.class,
+        "createServerCache")).intValue();
+    PORT2 = ((Integer)secondary.invoke(StatsBugDUnitTest.class,
+        "createServerCache")).intValue();
+  }
+
+  /**
+   * Create the cache
+   * 
+   * @param props -
+   *          properties for DS
+   * @return the cache instance
+   * @throws Exception -
+   *           thrown if any problem occurs in cache creation
+   */
+  private Cache createCache(Properties props) throws Exception
+  {
+    DistributedSystem ds = getSystem(props);
+    ds.disconnect();
+    ds = getSystem(props);
+    Cache cache = null;
+    cache = CacheFactory.create(ds);
+    if (cache == null) {
+      throw new Exception("CacheFactory.create() returned null ");
+    }
+    return cache;
+  }
+
+  /**
+   * close the cache instances in server and client during tearDown
+   * 
+   * @throws Exception
+   *           thrown if any problem occurs in closing cache
+   */
+  public void tearDown2() throws Exception
+  {
+    super.tearDown2();
+    // close client
+    client1.invoke(StatsBugDUnitTest.class, "closeCache");
+
+    // close server
+    primary.invoke(StatsBugDUnitTest.class, "closeCache");
+    secondary.invoke(StatsBugDUnitTest.class, "closeCache");
+  }
+
+  /**
+   * This test does the following:<br>
+   * 1)Create and populate the client<br>
+   * 2)Do some operations from the primary cache-server<br>
+   * 3)Stop the primary cache-server<br>
+   * 4)Wait some time to allow client to failover to secondary and do some
+   * operations from secondary<br>
+   * 5)Verify that the invalidates stats at the client accounts for the
+   * operations done by both, primary and secondary.
+   * 
+   * @throws Exception -
+   *           thrown if any problem occurs in test execution
+   */
+  public void testBug36109() throws Exception
+  {
+    getLogWriter().info("testBug36109 : BEGIN");
+    client1.invoke(StatsBugDUnitTest.class, "createClientCacheForInvalidates", new Object[] {
+        getServerHostName(Host.getHost(0)), new Integer(PORT1), new Integer(PORT2) });
+    client1.invoke(StatsBugDUnitTest.class, "prepopulateClient");
+    primary.invoke(StatsBugDUnitTest.class, "doEntryOperations",
+        new Object[] { primaryPrefix });
+    pause(3000);
+    primary.invoke(StatsBugDUnitTest.class, "stopServer");
+    try {
+      Thread.sleep(5000);
+    }
+    catch (InterruptedException ignore) {
+      fail("interrupted");
+    }
+
+    secondary.invoke(StatsBugDUnitTest.class, "doEntryOperations",
+        new Object[] { secondaryPrefix });
+    try {
+      Thread.sleep(5000);
+    }
+    catch (InterruptedException ignore) {
+      fail("interrupted");
+    }
+
+    client1.invoke(StatsBugDUnitTest.class, "verifyNumInvalidates");
+    getLogWriter().info("testBug36109 : END");
+  }
+
+  /**
+   * Creates and starts the cache-server
+   * 
+   * @return - the port on which cache-server is running
+   * @throws Exception -
+   *           thrown if any problem occurs in cache/server creation
+   */
+  public static Integer createServerCache() throws Exception
+  {
+    StatsBugDUnitTest test = new StatsBugDUnitTest("temp");
+    Properties props = new Properties();
+    cache = test.createCache(props);
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+
+    RegionAttributes attrs = factory.create();
+
+    cache.createRegion(REGION_NAME, attrs);
+    CacheServer server = cache.addCacheServer();
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    server.setPort(port);
+    server.setNotifyBySubscription(false);
+    server.setSocketBufferSize(32768);
+    server.start();
+    getLogWriter().info("Server started at PORT = " + port);
+    return new Integer(port);
+  }
+
+  /**
+   * Initializes the cache client
+   * 
+   * @param port1 -
+   *          port for the primary cache-server
+   * @param port2-port
+   *          for the secondary cache-server
+   * @throws Exception-thrown
+   *           if any problem occurs in initializing the client
+   */
+  public static void createClientCache(String host, Integer port1, Integer port2)
+      throws Exception
+  {
+    StatsBugDUnitTest test = new StatsBugDUnitTest("temp");
+    cache = test.createCache(createProperties1());
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
+    RegionAttributes attrs = factory.create();
+    Region region = cache.createRegion(REGION_NAME, attrs);
+    region.registerInterest("ALL_KEYS");
+    getLogWriter().info("Client cache created");
+  }
+
+  /**
+   * Initializes the cache client
+   * 
+   * @param port1 -
+   *          port for the primary cache-server
+   * @param port2-port
+   *          for the secondary cache-server
+   * @throws Exception-thrown
+   *           if any problem occurs in initializing the client
+   */
+  public static void createClientCacheForInvalidates(String host, Integer port1, Integer port2)
+      throws Exception
+  {
+    StatsBugDUnitTest test = new StatsBugDUnitTest("temp");
+    cache = test.createCache(createProperties1());
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    pool = (PoolImpl)ClientServerTestCase.configureConnectionPool(factory, host, new int[] {port1.intValue(),port2.intValue()}, true, -1, 3, null);
+    RegionAttributes attrs = factory.create();
+    Region region = cache.createRegion(REGION_NAME, attrs);
+    region.registerInterest("ALL_KEYS", false, false);
+    getLogWriter().info("Client cache created");
+  }
+  
+  /**
+   * Verify that the invalidates stats at the client accounts for the operations
+   * done by both, primary and secondary.
+   * 
+   */
+  public static void verifyNumInvalidates()
+  {
+    long invalidatesRecordedByStats = pool.getInvalidateCount();
+    getLogWriter().info(
+        "invalidatesRecordedByStats = " + invalidatesRecordedByStats);
+
+    int expectedInvalidates = TOTAL_SERVERS * PUTS_PER_SERVER;
+    getLogWriter().info("expectedInvalidates = " + expectedInvalidates);
+
+    if (invalidatesRecordedByStats != expectedInvalidates) {
+      fail("Invalidates received by client(" + invalidatesRecordedByStats
+          + ") does not match with the number of operations("
+          + expectedInvalidates + ") done at server");
+    }
+  }
+
+  /**
+   * Stops the cache server
+   * 
+   */
+  public static void stopServer()
+  {
+    try {
+      Iterator iter = cache.getCacheServers().iterator();
+      if (iter.hasNext()) {
+        CacheServer server = (CacheServer)iter.next();
+        server.stop();
+      }
+    }
+    catch (Exception e) {
+      fail("failed while stopServer()" + e);
+    }
+  }
+
+  /**
+   * create properties for a loner VM
+   */
+  private static Properties createProperties1()
+  {
+    Properties props = new Properties();
+    props.setProperty("mcast-port", "0");
+    props.setProperty("locators", "");
+    return props;
+  }
+
+
+  /**
+   * Do PUT operations
+   * 
+   * @param keyPrefix -
+   *          string prefix for the keys for all the entries do be done
+   * @throws Exception -
+   *           thrown if any exception occurs in doing PUTs
+   */
+  public static void doEntryOperations(String keyPrefix) throws Exception
+  {
+    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    for (int i = 0; i < PUTS_PER_SERVER; i++) {
+      r1.put(keyPrefix + i, keyPrefix + "val-" + i);
+    }
+  }
+
+  /**
+   * Prepopulate the client with the entries that will be done by cache-servers
+   * 
+   * @throws Exception
+   */
+  public static void prepopulateClient() throws Exception
+  {
+    doEntryOperations(primaryPrefix);
+    doEntryOperations(secondaryPrefix);
+  }
+
+  /**
+   * Close the cache
+   * 
+   */
+  public static void closeCache()
+  {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
----------------------------------------------------------------------
diff --git a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
new file mode 100755
index 0000000..8eec738
--- /dev/null
+++ b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.test.junit.categories;
+/**
+ * JUnit Test Category that specifies a test executes within a container
+ * environment such as an OSGi server.
+ *  
+ * @author Kirk Lund
+ */
+public interface ContainerTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
----------------------------------------------------------------------
diff --git a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
new file mode 100755
index 0000000..4fe535b
--- /dev/null
+++ b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.test.junit.categories;
+/**
+ * JUnit Test Category that specifies a hydra test.
+ *  
+ * @author Kirk Lund
+ */
+public interface HydraTest {
+}


[3/3] incubator-geode git commit: Remove Disabled from names of tests. Ensure each test has a Category and add Ignore to any test that is disabled due to being broken.

Posted by kl...@apache.org.
Remove Disabled from names of tests. Ensure each test has a Category and add Ignore to any test that is disabled due to being broken.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4f077657
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4f077657
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4f077657

Branch: refs/heads/feature/GEODE-714
Commit: 4f0776572e6e4d5b61ba609627401d392c537224
Parents: b417b27
Author: Kirk Lund <kl...@pivotal.io>
Authored: Thu Jan 7 13:56:09 2016 -0800
Committer: Kirk Lund <kl...@pivotal.io>
Committed: Thu Jan 7 13:56:09 2016 -0800

----------------------------------------------------------------------
 build.gradle                                    |   14 +-
 .../PutAllWithIndexPerfDUnitDisabledTest.java   |  215 ---
 .../index/PutAllWithIndexPerfDUnitTest.java     |  221 +++
 .../cache30/SlowRecDUnitDisabledTest.java       | 1446 -----------------
 .../gemfire/cache30/SlowRecDUnitTest.java       | 1453 ++++++++++++++++++
 .../distributed/DistributedMemberDUnitTest.java |    4 +
 .../gemfire/distributed/KirkDUnitTest.java      |   17 +
 .../distributed/KirkDistributedTestSuite.java   |   31 +
 .../distributed/KirkIntegrationTestSuite.java   |   30 +
 ...cpServerBackwardCompatDUnitDisabledTest.java |  250 ---
 .../TcpServerBackwardCompatDUnitTest.java       |  256 +++
 .../cache/EvictionDUnitDisabledTest.java        |  240 ---
 .../internal/cache/EvictionDUnitTest.java       |  246 +++
 .../cache/OffHeapEvictionDUnitTest.java         |    2 +-
 .../cache/ha/StatsBugDUnitDisabledTest.java     |  368 -----
 .../internal/cache/ha/StatsBugDUnitTest.java    |  374 +++++
 .../test/junit/categories/ContainerTest.java    |   25 +
 .../test/junit/categories/HydraTest.java        |   24 +
 18 files changed, 2695 insertions(+), 2521 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4563590..3fff7f9 100755
--- a/build.gradle
+++ b/build.gradle
@@ -350,6 +350,9 @@ subprojects {
       includeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
     }    
     
     // run each test in its own vm to avoid interference issues if a test doesn't clean up
@@ -370,6 +373,8 @@ subprojects {
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
     }    
 
     beforeTest { descriptor ->
@@ -384,6 +389,9 @@ subprojects {
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
       includeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
     }    
 
     forkEvery 1
@@ -403,10 +411,14 @@ subprojects {
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.UnitTest'
       excludeCategories 'com.gemstone.gemfire.test.junit.categories.IntegrationTest'
       includeCategories 'com.gemstone.gemfire.test.junit.categories.DistributedTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.PerformanceTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.HydraTest'
+      excludeCategories 'com.gemstone.gemfire.test.junit.categories.ContainerTest'
     }    
     
     //I'm hoping this might deal with SOME OOMEs I've seen
-    forkEvery 30
+    //forkEvery 30
+    forkEvery 1
   }
 
   // By proving a file with an arbitrary list of test classes, we can select only those

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java
deleted file mode 100644
index 22ce44b..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitDisabledTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package com.gemstone.gemfire.cache.query.internal.index;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.ClientCache;
-import com.gemstone.gemfire.cache.client.ClientCacheFactory;
-import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
-import com.gemstone.gemfire.cache.query.data.PortfolioPdx;
-import com.gemstone.gemfire.cache.query.dunit.RemoteQueryDUnitTest;
-import com.gemstone.gemfire.cache.server.CacheServer;
-import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.cache30.CacheTestCase;
-import com.gemstone.gemfire.internal.AvailablePort;
-
-import dunit.Host;
-import dunit.VM;
-
-/**
- * @author shobhit
- *
- */
-public class PutAllWithIndexPerfDUnitDisabledTest extends CacheTestCase {
-
-  /** The port on which the bridge server was started in this VM */
-  private static int bridgeServerPort;
-  static long timeWithoutStructTypeIndex = 0;
-  static long timeWithStructTypeIndex = 0;
-  
-  public PutAllWithIndexPerfDUnitDisabledTest(String name) {
-    super(name);
-  }
-
-  public void setUp() throws Exception {
-    super.setUp();
-    disconnectAllFromDS();
-  }
-
-  public void tearDown2() throws Exception {
-    try {
-      super.tearDown2();
-    }
-    finally {
-      disconnectAllFromDS();
-    }
-  }
-
-  public void testPutAllWithIndexes() {
-    final String name = "testRegion";
-    final Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    final int numberOfEntries = 10000;
-
-    // Start server
-    vm0.invoke(new CacheSerializableRunnable("Create Bridge Server") {
-        public void run2() throws CacheException {
-          Properties config = new Properties();
-          config.put("locators", "localhost["+getDUnitLocatorPort()+"]");
-          Cache cache = new CacheFactory(config).create();
-          AttributesFactory factory = new AttributesFactory();
-          factory.setScope(Scope.LOCAL);
-          cache.createRegionFactory(factory.create()).create(name);
-          try {
-            startBridgeServer(0, false);
-          } catch (Exception ex) {
-            fail("While starting CacheServer", ex);
-          }
-          //Create Index on empty region
-          try {
-            cache.getQueryService().createIndex("idIndex", "ID", "/"+name);
-          } catch (Exception e) {
-            fail("index creation failed", e);
-          }
-        }
-      });
-
-    // Create client region
-    final int port = vm0.invokeInt(PutAllWithIndexPerfDUnitDisabledTest.class, "getCacheServerPort");
-    final String host0 = getServerHostName(vm0.getHost());
-    vm1.invoke(new CacheSerializableRunnable("Create region") {
-        public void run2() throws CacheException {
-          Properties config = new Properties();
-          config.setProperty("mcast-port", "0");
-          ClientCache cache = new ClientCacheFactory().addPoolServer(host0, port).create();
-          AttributesFactory factory = new AttributesFactory();
-          factory.setScope(Scope.LOCAL);
-          cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
-        }
-      });
-
-    vm1.invoke(new CacheSerializableRunnable("putAll() test") {
-      
-      @Override
-      public void run2() throws CacheException {
-        Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
-
-        Map warmupMap = new HashMap();
-        Map data =  new HashMap();
-        for(int i=0; i<10000; i++){
-          Object p = new PortfolioPdx(i);
-          if (i < 1000) warmupMap.put(i, p);
-          data.put(i, p);
-        }
-        
-        for (int i=0; i<10; i++) {
-          exampleRegion.putAll(warmupMap);
-        }
-        
-        long start = System.currentTimeMillis();
-        for (int i=0; i<10; i++) {
-          exampleRegion.putAll(data);
-        }
-        long end = System.currentTimeMillis();
-        timeWithoutStructTypeIndex = ((end-start)/10);
-        System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
- 
-      }
-    });
-    
-    vm0.invoke(new CacheSerializableRunnable("Remove Index and create new one") {
-      
-      @Override
-      public void run2() throws CacheException {
-        try {
-          Cache cache = CacheFactory.getAnyInstance();
-          cache.getQueryService().removeIndexes();
-          cache.getRegion(name).clear();
-          cache.getQueryService().createIndex("idIndex", "p.ID", "/"+name+" p");
-        } catch (Exception e) {
-          fail("index creation failed", e);
-        }
-      }
-    });
-
-    vm1.invoke(new CacheSerializableRunnable("putAll() test") {
-      
-      @Override
-      public void run2() throws CacheException {
-        Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
-        exampleRegion.clear();
-        Map warmupMap = new HashMap();
-        Map data =  new HashMap();
-        for(int i=0; i<10000; i++){
-          Object p = new PortfolioPdx(i);
-          if (i < 1000) warmupMap.put(i, p);
-          data.put(i, p);
-        }
-        
-        for (int i=0; i<10; i++) {
-          exampleRegion.putAll(warmupMap);
-        }
-        
-        long start = System.currentTimeMillis();
-        for (int i=0; i<10; i++) {
-          exampleRegion.putAll(data);
-        }
-        long end = System.currentTimeMillis();
-        timeWithStructTypeIndex  = ((end-start)/10);
-        System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
- 
-      }
-    });
-    
-    if (timeWithoutStructTypeIndex > timeWithStructTypeIndex) {
-      fail("putAll took more time without struct type index than simple index");
-    }
-  }
-
-  /**
-   * Starts a bridge server on the given port, using the given
-   * deserializeValues and notifyBySubscription to serve up the
-   * given region.
-   */
-  protected void startBridgeServer(int port, boolean notifyBySubscription)
-    throws IOException {
-
-    Cache cache = CacheFactory.getAnyInstance();
-    CacheServer bridge = cache.addCacheServer();
-    bridge.setPort(port);
-    bridge.start();
-    bridgeServerPort = bridge.getPort();
-  }
-
-  private static int getCacheServerPort() {
-    return bridgeServerPort;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java
new file mode 100644
index 0000000..5eb65c3
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/query/internal/index/PutAllWithIndexPerfDUnitTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.cache.query.internal.index;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.query.data.PortfolioPdx;
+import com.gemstone.gemfire.cache.query.dunit.RemoteQueryDUnitTest;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * @author shobhit
+ *
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class PutAllWithIndexPerfDUnitTest extends CacheTestCase {
+
+  /** The port on which the bridge server was started in this VM */
+  private static int bridgeServerPort;
+  static long timeWithoutStructTypeIndex = 0;
+  static long timeWithStructTypeIndex = 0;
+  
+  public PutAllWithIndexPerfDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    disconnectAllFromDS();
+  }
+
+  public void tearDown2() throws Exception {
+    try {
+      super.tearDown2();
+    }
+    finally {
+      disconnectAllFromDS();
+    }
+  }
+
+  public void testPutAllWithIndexes() {
+    final String name = "testRegion";
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    final int numberOfEntries = 10000;
+
+    // Start server
+    vm0.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+        public void run2() throws CacheException {
+          Properties config = new Properties();
+          config.put("locators", "localhost["+getDUnitLocatorPort()+"]");
+          Cache cache = new CacheFactory(config).create();
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.LOCAL);
+          cache.createRegionFactory(factory.create()).create(name);
+          try {
+            startBridgeServer(0, false);
+          } catch (Exception ex) {
+            fail("While starting CacheServer", ex);
+          }
+          //Create Index on empty region
+          try {
+            cache.getQueryService().createIndex("idIndex", "ID", "/"+name);
+          } catch (Exception e) {
+            fail("index creation failed", e);
+          }
+        }
+      });
+
+    // Create client region
+    final int port = vm0.invokeInt(PutAllWithIndexPerfDUnitTest.class, "getCacheServerPort");
+    final String host0 = getServerHostName(vm0.getHost());
+    vm1.invoke(new CacheSerializableRunnable("Create region") {
+        public void run2() throws CacheException {
+          Properties config = new Properties();
+          config.setProperty("mcast-port", "0");
+          ClientCache cache = new ClientCacheFactory().addPoolServer(host0, port).create();
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.LOCAL);
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(name);
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("putAll() test") {
+      
+      @Override
+      public void run2() throws CacheException {
+        Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
+
+        Map warmupMap = new HashMap();
+        Map data =  new HashMap();
+        for(int i=0; i<10000; i++){
+          Object p = new PortfolioPdx(i);
+          if (i < 1000) warmupMap.put(i, p);
+          data.put(i, p);
+        }
+        
+        for (int i=0; i<10; i++) {
+          exampleRegion.putAll(warmupMap);
+        }
+        
+        long start = System.currentTimeMillis();
+        for (int i=0; i<10; i++) {
+          exampleRegion.putAll(data);
+        }
+        long end = System.currentTimeMillis();
+        timeWithoutStructTypeIndex = ((end-start)/10);
+        System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
+ 
+      }
+    });
+    
+    vm0.invoke(new CacheSerializableRunnable("Remove Index and create new one") {
+      
+      @Override
+      public void run2() throws CacheException {
+        try {
+          Cache cache = CacheFactory.getAnyInstance();
+          cache.getQueryService().removeIndexes();
+          cache.getRegion(name).clear();
+          cache.getQueryService().createIndex("idIndex", "p.ID", "/"+name+" p");
+        } catch (Exception e) {
+          fail("index creation failed", e);
+        }
+      }
+    });
+
+    vm1.invoke(new CacheSerializableRunnable("putAll() test") {
+      
+      @Override
+      public void run2() throws CacheException {
+        Region exampleRegion = ClientCacheFactory.getAnyInstance().getRegion(name);
+        exampleRegion.clear();
+        Map warmupMap = new HashMap();
+        Map data =  new HashMap();
+        for(int i=0; i<10000; i++){
+          Object p = new PortfolioPdx(i);
+          if (i < 1000) warmupMap.put(i, p);
+          data.put(i, p);
+        }
+        
+        for (int i=0; i<10; i++) {
+          exampleRegion.putAll(warmupMap);
+        }
+        
+        long start = System.currentTimeMillis();
+        for (int i=0; i<10; i++) {
+          exampleRegion.putAll(data);
+        }
+        long end = System.currentTimeMillis();
+        timeWithStructTypeIndex  = ((end-start)/10);
+        System.out.println("Total putall time for 10000 objects is: "+ ((end-start)/10) + "ms");
+ 
+      }
+    });
+    
+    if (timeWithoutStructTypeIndex > timeWithStructTypeIndex) {
+      fail("putAll took more time without struct type index than simple index");
+    }
+  }
+
+  /**
+   * Starts a bridge server on the given port, using the given
+   * deserializeValues and notifyBySubscription to serve up the
+   * given region.
+   */
+  protected void startBridgeServer(int port, boolean notifyBySubscription)
+    throws IOException {
+
+    Cache cache = CacheFactory.getAnyInstance();
+    CacheServer bridge = cache.addCacheServer();
+    bridge.setPort(port);
+    bridge.start();
+    bridgeServerPort = bridge.getPort();
+  }
+
+  private static int getCacheServerPort() {
+    return bridgeServerPort;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
deleted file mode 100644
index e669f04..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitDisabledTest.java
+++ /dev/null
@@ -1,1446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache30;
-
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.cache.*;
-import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.util.*;
-import com.gemstone.gemfire.distributed.internal.*;
-import com.gemstone.gemfire.internal.tcp.Connection;
-import dunit.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Test to make sure slow receiver queuing is working
- *
- * @author darrel
- * @since 4.2.1
- */
-public class SlowRecDUnitDisabledTest extends CacheTestCase {
-
-  public SlowRecDUnitDisabledTest(String name) {
-    super(name);
-  }
-
-  // this test has special config of its distributed system so
-  // the setUp and tearDown methods need to make sure we don't
-  // use the ds from previous test and that we don't leave ours around
-  // for the next test to use.
-  
-  public void setUp() throws Exception {
-    try {
-      disconnectAllFromDS();
-    } finally {
-      super.setUp();
-    }
-  }
-  public void tearDown2() throws Exception {
-    try {
-      super.tearDown2();
-    } finally {
-      disconnectAllFromDS();
-    }
-  }
-  
-  //////////////////////  Test Methods  //////////////////////
-
-  private VM getOtherVm() {
-    Host host = Host.getHost(0);
-    return host.getVM(0);
-  }
-
-  static protected Object lastCallback = null;
-
-  private void doCreateOtherVm(final Properties p, final boolean addListener) {
-    VM vm = getOtherVm();
-    vm.invoke(new CacheSerializableRunnable("create root") {
-        public void run2() throws CacheException {
-          getSystem(p);
-          createAckRegion(true, false);
-          AttributesFactory af = new AttributesFactory();
-          af.setScope(Scope.DISTRIBUTED_NO_ACK);
-          af.setDataPolicy(DataPolicy.REPLICATE);
-          if (addListener) {
-            CacheListener cl = new CacheListenerAdapter() {
-                public void afterUpdate(EntryEvent event) {
-                  // make the slow receiver event slower!
-                  try {Thread.sleep(500);} catch (InterruptedException shuttingDown) {fail("interrupted");}
-                }
-              };
-            af.setCacheListener(cl);
-          } else {
-            CacheListener cl = new CacheListenerAdapter() {
-                public void afterCreate(EntryEvent event) {
-//                   getLogWriter().info("afterCreate " + event.getKey());
-                  if (event.getCallbackArgument() != null) {
-                    lastCallback = event.getCallbackArgument();
-                  }
-                  if (event.getKey().equals("sleepkey")) {
-                    int sleepMs = ((Integer)event.getNewValue()).intValue();
-//                     getLogWriter().info("sleepkey sleeping for " + sleepMs);
-                    try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
-                  }
-                }
-                public void afterUpdate(EntryEvent event) {
-//                   getLogWriter().info("afterUpdate " + event.getKey());
-                  if (event.getCallbackArgument() != null) {
-                    lastCallback = event.getCallbackArgument();
-                  }
-                  if (event.getKey().equals("sleepkey")) {
-                    int sleepMs = ((Integer)event.getNewValue()).intValue();
-//                     getLogWriter().info("sleepkey sleeping for " + sleepMs);
-                    try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
-                  }
-                }
-                public void afterInvalidate(EntryEvent event) {
-                  if (event.getCallbackArgument() != null) {
-                    lastCallback = event.getCallbackArgument();
-                  }
-                }
-                public void afterDestroy(EntryEvent event) {
-                  if (event.getCallbackArgument() != null) {
-                    lastCallback = event.getCallbackArgument();
-                  }
-                }
-              };
-            af.setCacheListener(cl);
-          }
-          Region r1 = createRootRegion("slowrec", af.create());
-          // place holder so we receive updates
-          r1.create("key", "value");
-        }
-      });
-  }
-  static protected final String CHECK_INVALID = "CHECK_INVALID";
-  
-  private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
-    VM vm = getOtherVm();
-    vm.invoke(new CacheSerializableRunnable("check last value") {
-        public void run2() throws CacheException {
-          Region r1 = getRootRegion("slowrec");
-          if (lcb != null) {
-            WaitCriterion ev = new WaitCriterion() {
-              public boolean done() {
-                return lcb.equals(lastCallback);
-              }
-              public String description() {
-                return "waiting for callback";
-              }
-            };
-            DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
-            assertEquals(lcb, lastCallback);
-          }
-          if (lastValue == null) {
-            final Region r = r1;
-            WaitCriterion ev = new WaitCriterion() {
-              public boolean done() {
-                return r.getEntry("key") == null;
-              }
-              public String description() {
-                return "waiting for key to become null";
-              }
-            };
-            DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
-            assertEquals(null, r1.getEntry("key"));
-          } else if (CHECK_INVALID.equals(lastValue)) {
-            // should be invalid
-            {
-              final Region r = r1;
-              WaitCriterion ev = new WaitCriterion() {
-                public boolean done() {
-                  Entry e = r.getEntry("key");
-                  if (e == null) {
-                    return false;
-                  }
-                  return e.getValue() == null;
-                }
-                public String description() {
-                  return "waiting for invalidate";
-                }
-              };
-              DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
-//              assertNotNull(re);
-//              assertEquals(null, value);
-            }
-          } else {
-            {
-              int retryCount = 1000;
-              Region.Entry re = null;
-              Object value = null;
-              while (retryCount-- > 0) {
-                re = r1.getEntry("key");
-                if (re != null) {
-                  value = re.getValue();
-                  if (value != null && value.equals(lastValue)) {
-                    break;
-                  }
-                }
-                try {Thread.sleep(50);} catch (InterruptedException ignore) {fail("interrupted");}
-              }
-              assertNotNull(re);
-              assertNotNull(value);
-              assertEquals(lastValue, value);
-            }
-          }
-        }
-      });
-  }
-
-  private void forceQueueFlush() {
-    Connection.FORCE_ASYNC_QUEUE=false;
-    final DMStats stats = getSystem().getDistributionManager().getStats();
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return stats.getAsyncThreads() == 0;
-      }
-      public String description() {
-        return "Waiting for async threads to disappear";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
-  }
-  
-  private void forceQueuing(final Region r) throws CacheException {
-    Connection.FORCE_ASYNC_QUEUE=true;
-    final DMStats stats = getSystem().getDistributionManager().getStats();
-    r.put("forcekey", "forcevalue");
-    
-    // wait for the flusher to get its first flush in progress
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return stats.getAsyncQueueFlushesInProgress() != 0;
-      }
-      public String description() {
-        return "waiting for flushes to start";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
-  }
-  
-  /**
-   * Make sure that noack puts to a receiver
-   * will eventually queue and then catch up.
-   */
-  public void testNoAck() throws CacheException {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final DMStats stats = getSystem().getDistributionManager().getStats();
-
-    // create receiver in vm0 with queuing enabled
-    Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "1");
-    doCreateOtherVm(p, false);
-
-    int repeatCount = 2;
-    int count = 0;
-    while (repeatCount-- > 0) {
-      forceQueuing(r);
-      final Object key = "key";
-      long queuedMsgs = stats.getAsyncQueuedMsgs();
-      long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-//      long conflatedMsgs = stats.getAsyncConflatedMsgs();
-      long queueSize = stats.getAsyncQueueSize();
-      String lastValue = "";
-      final long intialQueuedMsgs = queuedMsgs;
-      long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
-      try {
-        // loop while we still have queued the initially queued msgs
-        // OR the cur # of queued msgs < 6
-        while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
-          String value = "count=" + count;
-          lastValue = value;
-          r.put(key, value);
-          count ++;
-          queueSize = stats.getAsyncQueueSize();
-          queuedMsgs = stats.getAsyncQueuedMsgs();
-          dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-          curQueuedMsgs = queuedMsgs - dequeuedMsgs;
-        }
-        getLogWriter().info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
-      } finally {
-        forceQueueFlush();
-      }
-      WaitCriterion ev = new WaitCriterion() {
-        public boolean done() {
-          return stats.getAsyncQueueSize() == 0;
-        }
-        public String description() {
-          return "Waiting for queues to empty";
-        }
-      };
-      final long start = System.currentTimeMillis();
-      DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
-      final long finish = System.currentTimeMillis();
-      getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);
-    
-      checkLastValueInOtherVm(lastValue, null);
-    }
-  }
-  /**
-   * Create a region named AckRegion with ACK scope
-   */
-  protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    if (mirror) {
-      factory.setDataPolicy(DataPolicy.REPLICATE);
-    }
-    if (conflate) {
-      factory.setEnableAsyncConflation(true);
-    }
-    final Region r = createRootRegion("AckRegion", factory.create());
-    return r;
-  }
-  /**
-   * Make sure that noack puts to a receiver
-   * will eventually queue and then catch up with conflation
-   */
-  public void testNoAckConflation() throws CacheException {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    factory.setEnableAsyncConflation(true);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final DMStats stats = getSystem().getDistributionManager().getStats();
-
-    // create receiver in vm0 with queuing enabled
-    Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "1");
-    doCreateOtherVm(p, false);
-
-    forceQueuing(r);
-    final Object key = "key";
-    int count = 0;
-//    long queuedMsgs = stats.getAsyncQueuedMsgs();
-//    long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-//    long queueSize = stats.getAsyncQueueSize();
-    String lastValue = "";
-    final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
-    long start = 0;
-    try {
-      while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
-        String value = "count=" + count;
-        lastValue = value;
-        r.put(key, value);
-        count ++;
-        //       getLogWriter().info("After " + count + " "
-        //                           + " puts queueSize=" + queueSize
-        //                           + "    queuedMsgs=" + queuedMsgs
-        //                           + "  dequeuedMsgs=" + dequeuedMsgs
-        //                           + " conflatedMsgs=" + conflatedMsgs);
-      }
-      start = System.currentTimeMillis();
-    } finally {
-      forceQueueFlush();
-    }
-//     queueSize = stats.getAsyncQueueSize();
-//     queuedMsgs = stats.getAsyncQueuedMsgs();
-
-//     getLogWriter().info("After " + count + " "
-//                         + " puts slowrec mode kicked in by queuing "
-//                         + queuedMsgs + " for a total size of " + queueSize
-//                         + " conflatedMsgs=" + conflatedMsgs
-//                         + " dequeuedMsgs=" + dequeuedMsgs);
-//     final long start = System.currentTimeMillis();
-//     while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
-//       try {Thread.sleep(100);} catch (InterruptedException ignore) {}
-//       queueSize = stats.getAsyncQueueSize();
-//       queuedMsgs = stats.getAsyncQueuedMsgs();
-//       dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-//       conflatedMsgs = stats.getAsyncConflatedMsgs();
-//       getLogWriter().info("After sleeping"
-//                           + "     queueSize=" + queueSize
-//                           + "    queuedMsgs=" + queuedMsgs
-//                           + "  dequeuedMsgs=" + dequeuedMsgs
-//                           + " conflatedMsgs=" + conflatedMsgs);
-    final long finish = System.currentTimeMillis();
-    getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The lastValue was " + lastValue);
-    
-    checkLastValueInOtherVm(lastValue, null);
-  }
-  /**
-   * make sure ack does not hang
-   * make sure two ack updates do not conflate but are both queued
-   */
-  public void testAckConflation() throws CacheException {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    factory.setEnableAsyncConflation(true);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final Region ar = createAckRegion(false, true);
-    ar.create("ackKey", "ackValue");
-    
-    final DMStats stats = getSystem().getDistributionManager().getStats();
-
-    // create receiver in vm0 with queuing enabled
-    Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "2");
-    doCreateOtherVm(p, false);
-
-    forceQueuing(r);
-    {
-      // make sure ack does not hang
-      // make sure two ack updates do not conflate but are both queued
-      long startQueuedMsgs = stats.getAsyncQueuedMsgs();
-      long startConflatedMsgs = stats.getAsyncConflatedMsgs();
-      Thread t = new Thread(new Runnable() {
-          public void run() {
-            ar.put("ackKey", "ackValue");
-          }
-        });
-      t.start();
-      Thread t2 = new Thread(new Runnable() {
-          public void run() {
-            ar.put("ackKey", "ackValue");
-          }
-        });
-      t2.start();
-      // give threads a chance to get queued
-      try {Thread.sleep(100);} catch (InterruptedException ignore) {fail("interrupted");}
-      forceQueueFlush();
-      DistributedTestCase.join(t, 2 * 1000, getLogWriter());
-      DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
-      long endQueuedMsgs = stats.getAsyncQueuedMsgs();
-      long endConflatedMsgs = stats.getAsyncConflatedMsgs();
-      assertEquals(startConflatedMsgs, endConflatedMsgs);
-      // queue should be flushed by the time we get an ack
-      assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
-      assertEquals(startQueuedMsgs+2, endQueuedMsgs);
-    }
-  }
-  /**
-   * Make sure that only sequences of updates are conflated
-   * Also checks that sending to a conflating region and non-conflating region
-   * does the correct thing.
-   * Test disabled because it intermittently fails due to race conditions
-   * in test. This has been fixed in congo's tests. See bug 35357.
-   */
-  public void _disabled_testConflationSequence() throws CacheException {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    factory.setEnableAsyncConflation(true);
-    final Region r = createRootRegion("slowrec", factory.create());
-    factory.setEnableAsyncConflation(false);
-    final Region noConflate = createRootRegion("noConflate", factory.create());
-    final DMStats stats = getSystem().getDistributionManager().getStats();
-
-    // create receiver in vm0 with queuing enabled
-    Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "1");
-    doCreateOtherVm(p, false);
-    {
-      VM vm = getOtherVm();
-      vm.invoke(new CacheSerializableRunnable("create noConflate") {
-          public void run2() throws CacheException {
-            AttributesFactory af = new AttributesFactory();
-            af.setScope(Scope.DISTRIBUTED_NO_ACK);
-            af.setDataPolicy(DataPolicy.REPLICATE);
-            createRootRegion("noConflate", af.create());
-          }
-        });
-    }
-
-    // now make sure update+destroy does not conflate
-    final Object key = "key";      
-    getLogWriter().info("[testConflationSequence] about to force queuing");
-    forceQueuing(r);
-
-    int count = 0;
-    String value = "";
-    String lastValue = value;
-    Object mylcb = null;
-    long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-//    long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-//    long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    int endCount = count+60;
-
-    getLogWriter().info("[testConflationSequence] about to build up queue");
-    long begin = System.currentTimeMillis();
-    while (count < endCount) {
-      value = "count=" + count;
-      lastValue = value;
-      r.create(key, value);
-      count ++;
-      value = "count=" + count;
-      lastValue = value;
-      r.put(key, value);
-      count ++;
-      mylcb = value;
-      r.destroy(key, mylcb);
-      count ++;
-      lastValue = null;
-//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
-    }
-    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
-    forceQueueFlush();
-    checkLastValueInOtherVm(lastValue, mylcb);
-
-    // now make sure create+update+localDestroy does not conflate
-    getLogWriter().info("[testConflationSequence] force queuing create-update-destroy");
-    forceQueuing(r);
-    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    endCount = count + 40;
-    
-    getLogWriter().info("[testConflationSequence] create-update-destroy");
-    begin = System.currentTimeMillis();
-    while (count < endCount) {
-      value = "count=" + count;
-      lastValue = value;
-      r.create(key, value);
-      count++;
-      value = "count=" + count;
-      lastValue = value;
-      r.put(key, value);
-      count ++;
-      r.localDestroy(key);
-//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
-    }
-    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
-    forceQueueFlush();
-    checkLastValueInOtherVm(lastValue, null);
-
-    // now make sure update+invalidate does not conflate
-    getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
-    forceQueuing(r);
-    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
-//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    value = "count=" + count;
-    lastValue = value;
-    r.create(key, value);
-    count++;
-//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    endCount = count + 40;
-
-    getLogWriter().info("[testConflationSequence] update-invalidate");
-    begin = System.currentTimeMillis();
-    while (count < endCount) {
-      value = "count=" + count;
-      lastValue = value;
-      r.put(key, value);
-      count ++;
-      r.invalidate(key);
-      count ++;
-      lastValue = CHECK_INVALID;
-//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
-    }
-    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
-    forceQueueFlush();
-    getLogWriter().info("[testConflationSequence] assert other vm");
-    checkLastValueInOtherVm(lastValue, null);
-
-    r.destroy(key);
-
-    // now make sure updates to a conflating region are conflated even while
-    // updates to a non-conflating are not.
-    getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
-    forceQueuing(r);
-    final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
-//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    
-    value = "count=" + count;
-    lastValue = value;
-    long conflatedMsgs = stats.getAsyncConflatedMsgs();
-    long queuedMsgs = stats.getAsyncQueuedMsgs();
-    r.create(key, value);
-    queuedMsgs++;
-    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
-    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-    r.put(key, value);
-    queuedMsgs++;
-    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
-    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-    noConflate.create(key, value);
-    queuedMsgs++;
-    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
-    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-    noConflate.put(key, value);
-    queuedMsgs++;
-    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
-    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-    count++;
-//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-    endCount = count + 80;
-
-    begin = System.currentTimeMillis();
-    getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
-                        + " queuedMsgs=" + stats.getAsyncQueuedMsgs()
-                        + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
-                        + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
-                        + " asyncSocketWrites=" + stats.getAsyncSocketWrites()
-                        );
-    while (count < endCount) {
-      // make sure we continue to have a flush in progress
-      assertEquals(1, stats.getAsyncThreads());
-      assertEquals(1, stats.getAsyncQueues());
-      assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
-      // make sure we are not completing any flushing while this loop is in progress
-      assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
-      value = "count=" + count;
-      lastValue = value;
-      r.put(key, value);
-      count ++;
-      // make sure it was conflated and not queued
-      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
-      conflatedMsgs++;
-      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-      noConflate.put(key, value);
-      // make sure it was queued and not conflated
-      queuedMsgs++;
-      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
-      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
-//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
-      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
-    }
-
-    forceQueueFlush();
-    getLogWriter().info("[testConflationSequence] assert other vm");
-    checkLastValueInOtherVm(lastValue, null);
-  }
-  /**
-   * Make sure that exceeding the queue size limit causes a disconnect.
-   */
-  public void testSizeDisconnect() throws CacheException {
-    final String expected = 
-      "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
-      "||java.io.IOException: Broken pipe";
-    final String addExpected = 
-      "<ExpectedException action=add>" + expected + "</ExpectedException>";
-    final String removeExpected = 
-      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final DM dm = getSystem().getDistributionManager();
-    final DMStats stats = dm.getStats();
-    // set others before vm0 connects
-    final Set others = dm.getOtherDistributionManagerIds();
-
-    // create receiver in vm0 with queuing enabled
-    Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "5");
-    p.setProperty("async-max-queue-size", "1"); // 1 meg
-    doCreateOtherVm(p, false);
-
-    
-    final Object key = "key";
-    final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give us 10 of these 100K msgs before queue full
-    final byte[] value = new byte[VALUE_SIZE];
-    int count = 0;
-    forceQueuing(r);
-    long queuedMsgs = stats.getAsyncQueuedMsgs();
-    long queueSize = stats.getAsyncQueueSize();
-    
-    getCache().getLogger().info(addExpected);
-    try {    
-      while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
-        r.put(key, value);
-        count ++;
-        if (stats.getAsyncQueueSize() > 0) {
-          queuedMsgs = stats.getAsyncQueuedMsgs();
-          queueSize = stats.getAsyncQueueSize();
-        }
-        if (count > 100) {
-          fail("should have exceeded max-queue-size by now");
-        }
-      }
-      getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
-      // make sure we lost a connection to vm0
-      WaitCriterion ev = new WaitCriterion() {
-        public boolean done() {
-          return dm.getOtherDistributionManagerIds().size() <= others.size()
-              && stats.getAsyncQueueSize() == 0;
-        }
-        public String description() {
-          return "waiting for connection loss";
-        }
-      };
-      DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
-    }
-    finally {
-      forceQueueFlush();
-      getCache().getLogger().info(removeExpected);
-    }
-    assertEquals(others, dm.getOtherDistributionManagerIds());
-    assertEquals(0, stats.getAsyncQueueSize());
-  }
-  /**
-   * Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
-   * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
-   * in build.xml in the splitbrainNov07 branch.  It had been disabled since
-   * June 2006 due to hangs.  Some of the tests, like this one, still need
-   * work because the periodically (some quite often) fail.
-   */
-  public void donottestTimeoutDisconnect() throws CacheException {
-    final String expected = 
-      "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
-      "||java.io.IOException: Broken pipe";
-    final String addExpected = 
-      "<ExpectedException action=add>" + expected + "</ExpectedException>";
-    final String removeExpected = 
-      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
-      
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final DM dm = getSystem().getDistributionManager();
-    final DMStats stats = dm.getStats();
-    // set others before vm0 connects
-    final Set others = dm.getOtherDistributionManagerIds();
-
-    // create receiver in vm0 with queuing enabled
-    Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "5");
-    p.setProperty("async-queue-timeout", "500"); // 500 ms
-    doCreateOtherVm(p, true);
-
-    
-    final Object key = "key";
-    final int VALUE_SIZE = 1024; // 1k
-    final byte[] value = new byte[VALUE_SIZE];
-    int count = 0;
-    long queuedMsgs = stats.getAsyncQueuedMsgs();
-    long queueSize = stats.getAsyncQueueSize();
-    final long timeoutLimit = System.currentTimeMillis() + 5000;
-
-    getCache().getLogger().info(addExpected);
-    try {    
-      while (stats.getAsyncQueueTimeouts() == 0) {
-        r.put(key, value);
-        count ++;
-        if (stats.getAsyncQueueSize() > 0) {
-          queuedMsgs = stats.getAsyncQueuedMsgs();
-          queueSize = stats.getAsyncQueueSize();
-        }
-        if (System.currentTimeMillis() > timeoutLimit) {
-          fail("should have exceeded async-queue-timeout by now");
-        }
-      }
-      getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
-      // make sure we lost a connection to vm0
-      WaitCriterion ev = new WaitCriterion() {
-        public boolean done() {
-          if (dm.getOtherDistributionManagerIds().size() > others.size()) {
-            return false;
-          }
-          return stats.getAsyncQueueSize() == 0;
-        }
-        public String description() {
-          return "waiting for departure";
-        }
-      };
-      DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
-    }
-    finally {
-      getCache().getLogger().info(removeExpected);
-    }
-    assertEquals(others, dm.getOtherDistributionManagerIds());
-    assertEquals(0, stats.getAsyncQueueSize());
-  }
-
-  // static helper methods ---------------------------------------------------
-  
-  private static final String KEY_SLEEP = "KEY_SLEEP";
-  private static final String KEY_WAIT = "KEY_WAIT";
-  private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
-  
-  protected final static int CALLBACK_CREATE = 0;
-  protected final static int CALLBACK_UPDATE = 1;
-  protected final static int CALLBACK_INVALIDATE = 2;
-  protected final static int CALLBACK_DESTROY = 3;
-  protected final static int CALLBACK_REGION_INVALIDATE = 4;
-  
-  protected final static Integer CALLBACK_CREATE_INTEGER = new Integer(CALLBACK_CREATE);
-  protected final static Integer CALLBACK_UPDATE_INTEGER = new Integer(CALLBACK_UPDATE);
-  protected final static Integer CALLBACK_INVALIDATE_INTEGER = new Integer(CALLBACK_INVALIDATE);
-  protected final static Integer CALLBACK_DESTROY_INTEGER = new Integer(CALLBACK_DESTROY);
-  protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new Integer(CALLBACK_REGION_INVALIDATE);
-
-  private static class CallbackWrapper {
-    public final Object callbackArgument;
-    public final  int callbackType;
-    public CallbackWrapper(Object callbackArgument, int callbackType) {
-      this.callbackArgument = callbackArgument;
-      this.callbackType = callbackType;
-    }
-    public String toString() {
-      return "CallbackWrapper: " + callbackArgument.toString() + " of type " + callbackType;
-    }
-  }
-  
-  protected static class ControlListener extends CacheListenerAdapter {
-    public final LinkedList callbackArguments = new LinkedList();
-    public final LinkedList callbackTypes = new LinkedList();
-    public final Object CONTROL_LOCK = new Object();
-    
-    public void afterCreate(EntryEvent event) {
-      getLogWriter().info(event.getRegion().getName() + " afterCreate " + event.getKey());
-      synchronized(this.CONTROL_LOCK) {
-        if (event.getCallbackArgument() != null) {
-          this.callbackArguments.add(
-            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
-          this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
-          this.CONTROL_LOCK.notifyAll();
-        }
-      }
-      processEvent(event);
-    }
-    public void afterUpdate(EntryEvent event) {
-      getLogWriter().info(event.getRegion().getName() + " afterUpdate " + event.getKey());
-      synchronized(this.CONTROL_LOCK) {
-        if (event.getCallbackArgument() != null) {
-          this.callbackArguments.add(
-            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
-          this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
-          this.CONTROL_LOCK.notifyAll();
-        }
-      }
-      processEvent(event);
-    }
-    public void afterInvalidate(EntryEvent event) {
-      synchronized(this.CONTROL_LOCK) {
-        if (event.getCallbackArgument() != null) {
-          this.callbackArguments.add(
-            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_INVALIDATE));
-          this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
-          this.CONTROL_LOCK.notifyAll();
-        }
-      }
-    }
-    public void afterDestroy(EntryEvent event) {
-      synchronized(this.CONTROL_LOCK) {
-        if (event.getCallbackArgument() != null) {
-          this.callbackArguments.add(
-            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_DESTROY));
-          this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
-          this.CONTROL_LOCK.notifyAll();
-        }
-      }
-    }
-    public void afterRegionInvalidate(RegionEvent event) {
-      synchronized(this.CONTROL_LOCK) {
-        if (event.getCallbackArgument() != null) {
-          this.callbackArguments.add(
-            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_REGION_INVALIDATE));
-          this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
-          this.CONTROL_LOCK.notifyAll();
-        }
-      }
-    }
-    private void processEvent(EntryEvent event) {
-      if (event.getKey().equals(KEY_SLEEP)) {
-        processSleep(event);
-      }
-      else if (event.getKey().equals(KEY_WAIT)) {
-        processWait(event);
-      }
-      else if (event.getKey().equals(KEY_DISCONNECT)) {
-        processDisconnect(event);
-      }
-    }
-    private void processSleep(EntryEvent event) {
-      int sleepMs = ((Integer)event.getNewValue()).intValue();
-      getLogWriter().info("[processSleep] sleeping for " + sleepMs);
-      try {
-        Thread.sleep(sleepMs);
-      } catch (InterruptedException ignore) {fail("interrupted");}
-    }
-    private void processWait(EntryEvent event) {
-      int sleepMs = ((Integer)event.getNewValue()).intValue();
-      getLogWriter().info("[processWait] waiting for " + sleepMs);
-      synchronized(this.CONTROL_LOCK) {
-        try {
-          this.CONTROL_LOCK.wait(sleepMs);
-        } catch (InterruptedException ignore) {return;}
-      }
-    }
-    private void processDisconnect(EntryEvent event) {
-      getLogWriter().info("[processDisconnect] disconnecting");
-      disconnectFromDS();
-    }
-  };
-
-  /**
-   * Make sure a multiple no ack regions conflate properly.
-   * [bruce] disabled when use of this dunit test class was reenabled in
-   * the splitbrainNov07 branch.  The class had been disabled since
-   * June 2006 r13222 in the trunk.  This test is failing because conflation
-   * isn't kicking in for some reason.
-   */
-  public void donottestMultipleRegionConflation() throws Throwable {
-    try {
-      doTestMultipleRegionConflation();
-    }
-    catch (VirtualMachineError e) {
-      SystemFailure.initiateFailure(e);
-      throw e;
-    }
-    catch (Throwable t) {
-      getLogWriter().error("Encountered exception: ", t);
-      throw t;
-    }
-    finally {
-      // make sure other vm was notified even if test failed
-      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
-        public void run() {
-          synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
-            doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
-          }
-        }
-      });
-    }
-  }
-  protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
-  protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
-  private void doTestMultipleRegionConflation() throws Exception {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    factory.setEnableAsyncConflation(true);
-    final Region r1 = createRootRegion("slowrec1", factory.create());
-    final Region r2 = createRootRegion("slowrec2", factory.create());
-    
-    assertTrue(getSystem().isConnected());
-    assertNotNull(r1);
-    assertFalse(r1.isDestroyed());
-    assertNotNull(getCache());
-    assertNotNull(getCache().getRegion("slowrec1"));
-    assertNotNull(r2);
-    assertFalse(r2.isDestroyed());
-    assertNotNull(getCache());
-    assertNotNull(getCache().getRegion("slowrec2"));
-    
-    final DM dm = getSystem().getDistributionManager();
-    final Serializable controllerVM = dm.getDistributionManagerId();
-    final DMStats stats = dm.getStats();
-    final int millisToWait = 1000 * 60 * 5; // 5 minutes
-    
-    // set others before vm0 connects
-    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-
-    // create receiver in vm0 with queuing enabled
-    final Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "5");
-    p.setProperty("async-queue-timeout", "86400000"); // max value
-    p.setProperty("async-max-queue-size", "1024"); // max value
-
-    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
-      public void run2() throws CacheException {
-        getSystem(p);
-        
-        DM dm = getSystem().getDistributionManager();
-        assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
-        
-        AttributesFactory af = new AttributesFactory();
-        af.setScope(Scope.DISTRIBUTED_NO_ACK);
-        af.setDataPolicy(DataPolicy.REPLICATE);
-        
-        doTestMultipleRegionConflation_R1_Listener = new ControlListener();
-        af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
-        createRootRegion("slowrec1", af.create());
-        
-        doTestMultipleRegionConflation_R2_Listener = new ControlListener();
-        af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
-        createRootRegion("slowrec2", af.create());
-      }
-    });
-    
-    // put vm0 cache listener into wait
-    getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 into wait");
-    r1.put(KEY_WAIT, new Integer(millisToWait));
-
-    // build up queue size
-    getLogWriter().info("[doTestMultipleRegionConflation] building up queue size...");
-    final Object key = "key";
-    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
-    final int VALUE_SIZE = socketBufferSize*3;
-    //final int VALUE_SIZE = 1024 * 1024  ; // 1 MB
-    final byte[] value = new byte[VALUE_SIZE];
-
-    int count = 0;
-    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
-      count++;
-      r1.put(key, value);
-    }
-    
-    getLogWriter().info("[doTestMultipleRegionConflation] After " + 
-      count + " puts of size " + VALUE_SIZE + 
-      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
-    // put values that will be asserted
-    final Object key1 = "key1";
-    final Object key2 = "key2";
-    Object putKey = key1;
-    boolean flag = true;
-    for (int i = 0; i < 30; i++) {
-      if (i == 10) putKey = key2;
-      if (flag) {
-        if (i == 6) {
-          r1.invalidate(putKey, new Integer(i));
-        } else if (i == 24) {
-          r1.invalidateRegion(new Integer(i));
-        } else {
-          r1.put(putKey, value, new Integer(i));
-        }
-      } else {
-        if (i == 15) {
-          r2.destroy(putKey, new Integer(i));
-        } else {
-          r2.put(putKey, value, new Integer(i));
-        }
-      }
-      flag = !flag;
-    }
-    
-    // r1: key1, 0, create
-    // r1: key1, 4, update
-    // r1: key1, 6, invalidate
-    // r1: key1, 8, update
-    
-    // r1: key2, 10, create
-    // r1:       24, invalidateRegion
-    // r1: key2, 28, update
-
-    // r2: key1, 1, create
-    // r2: key1, 9, update
-    
-    // r2: key2, 11, create
-    // r2: key2, 13, update
-    // r2: key2, 15, destroy
-    // r2: key2, 17, create
-    // r2: key2, 29, update
-    
-    final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 }; 
-    final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
-      { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
-        CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE }; 
-    
-    final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
-    final int[] r2ExpectedTypes = new int[] 
-      { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
-        CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE }; 
-
-    // send notify to vm0
-    getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
-    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
-      public void run() {
-        synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
-          doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
-        }
-      }
-    });
-    
-    // wait for queue to be flushed
-    getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
-    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
-      public void run() {
-        try {
-          synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
-            while (doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < r1ExpectedArgs.length) {
-              doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
-            }
-          }
-          synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
-            while (doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < r2ExpectedArgs.length) {
-              doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
-            }
-          }
-        } catch (InterruptedException ignore) {fail("interrupted");}
-      }
-    });
-    
-    // assert values on both listeners
-    getLogWriter().info("[doTestMultipleRegionConflation] assert callback arguments");
-    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
-      public void run() {
-        synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
-          getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments=" + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
-          getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" + doTestMultipleRegionConflation_R1_Listener.callbackTypes);
-          assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
-                       doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
-          int i = 0;
-          for (Iterator iter = doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); iter.hasNext();) {
-            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
-            assertEquals(new Integer(r1ExpectedArgs[i]), 
-              wrapper.callbackArgument);
-            assertEquals(new Integer(r1ExpectedTypes[i]), 
-              doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
-            i++;
-          }
-        }
-        synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
-          getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments=" + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
-          getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" + doTestMultipleRegionConflation_R2_Listener.callbackTypes);
-          assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
-                       doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
-          int i = 0;
-          for (Iterator iter = doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); iter.hasNext();) {
-            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
-            assertEquals(new Integer(r2ExpectedArgs[i]), 
-              wrapper.callbackArgument);
-            assertEquals(new Integer(r2ExpectedTypes[i]), 
-              doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
-            i++;
-          }
-        }
-      }
-    });
-  }
-
-  /**
-   * Make sure a disconnect causes queue memory to be released.
-   */
-  public void testDisconnectCleanup() throws Throwable {
-    try {
-      doTestDisconnectCleanup();
-    }
-    catch (VirtualMachineError e) {
-      SystemFailure.initiateFailure(e);
-      throw e;
-    }
-    catch (Throwable t) {
-      getLogWriter().error("Encountered exception: ", t);
-      throw t;
-    }
-    finally {
-      // make sure other vm was notified even if test failed
-      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
-        public void run() {
-          synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
-            doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
-          }
-        }
-      });
-    }
-  }
-  protected static ControlListener doTestDisconnectCleanup_Listener;
-  private void doTestDisconnectCleanup() throws Exception {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final DM dm = getSystem().getDistributionManager();
-    final DMStats stats = dm.getStats();
-    // set others before vm0 connects
-    final Set others = dm.getOtherDistributionManagerIds();
-    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-    final int initialQueues = stats.getAsyncQueues();
-
-    // create receiver in vm0 with queuing enabled
-    final Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", "5");
-    p.setProperty("async-queue-timeout", "86400000"); // max value
-    p.setProperty("async-max-queue-size", "1024"); // max value
-    
-    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
-      public void run2() throws CacheException {
-        getSystem(p);
-        AttributesFactory af = new AttributesFactory();
-        af.setScope(Scope.DISTRIBUTED_NO_ACK);
-        af.setDataPolicy(DataPolicy.REPLICATE);
-        
-        doTestDisconnectCleanup_Listener = new ControlListener();
-        af.setCacheListener(doTestDisconnectCleanup_Listener);
-        createRootRegion("slowrec", af.create());
-      }
-    });
-
-    // put vm0 cache listener into wait
-    getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
-    int millisToWait = 1000 * 60 * 5; // 5 minutes
-    r.put(KEY_WAIT, new Integer(millisToWait));
-    r.put(KEY_DISCONNECT, KEY_DISCONNECT);
-
-    // build up queue size
-    getLogWriter().info("[testDisconnectCleanup] building up queue size...");
-    final Object key = "key";
-    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
-    final int VALUE_SIZE = socketBufferSize*3;
-    //final int VALUE_SIZE = 1024 * 1024  ; // 1 MB
-    final byte[] value = new byte[VALUE_SIZE];
-
-    int count = 0;
-    final long abortMillis = System.currentTimeMillis() + millisToWait;
-    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
-      count++;
-      r.put(key, value);
-      assertFalse(System.currentTimeMillis() >= abortMillis);
-    }
-    
-    getLogWriter().info("[testDisconnectCleanup] After " + 
-      count + " puts of size " + VALUE_SIZE + 
-      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
-    while (stats.getAsyncQueuedMsgs() < 10 ||
-           stats.getAsyncQueueSize() < VALUE_SIZE*10) {
-      count++;
-      r.put(key, value);
-      assertFalse(System.currentTimeMillis() >= abortMillis);
-    }
-    assertTrue(stats.getAsyncQueuedMsgs() >= 10);
-
-    while (stats.getAsyncQueues() < 1) {
-      pause(100);
-      assertFalse(System.currentTimeMillis() >= abortMillis);
-    }
-    
-    getLogWriter().info("[testDisconnectCleanup] After " + 
-      count + " puts of size " + VALUE_SIZE + " queue size has reached " + 
-      stats.getAsyncQueueSize() + " bytes and number of queues is " + 
-      stats.getAsyncQueues() + ".");
-
-    assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
-    assertEquals(initialQueues+1, stats.getAsyncQueues());
-
-    // assert vm0 is still connected
-    assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
-    
-    // send notify to vm0
-    getLogWriter().info("[testDisconnectCleanup] wake up vm0");
-    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
-      public void run() {
-        synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
-          doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
-        }
-      }
-    });
-    
-    // make sure we lost a connection to vm0
-    getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return dm.getOtherDistributionManagerIds().size() <= others.size();
-      }
-      public String description() {
-        return "waiting for disconnect";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
-    assertEquals(others, dm.getOtherDistributionManagerIds());
-    
-    // check free memory... perform wait loop with System.gc
-    getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
-    ev = new WaitCriterion() {
-      public boolean done() {
-        if (stats.getAsyncQueues() <= initialQueues) {
-          return true;
-        }
-        Runtime.getRuntime().gc();
-        return false;
-      }
-      public String description() {
-        return "waiting for queue cleanup";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
-//    getLogWriter().info("[testDisconnectCleanup] initialQueues=" + 
-//      initialQueues + " asyncQueues=" + stats.getAsyncQueues());
-    assertEquals(initialQueues, stats.getAsyncQueues());
-  }
-
-  /**
-   * Make sure a disconnect causes queue memory to be released.<p>
-     * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
-   * in build.xml in the splitbrainNov07 branch.  It had been disabled since
-   * June 2006 due to hangs.  Some of the tests, like this one, still need
-   * work because the periodically (some quite often) fail.
- */
-  public void donottestPartialMessage() throws Throwable {
-    try {
-      doTestPartialMessage();
-    }
-    catch (VirtualMachineError e) {
-      SystemFailure.initiateFailure(e);
-      throw e;
-    }
-    catch (Throwable t) {
-      getLogWriter().error("Encountered exception: ", t);
-      throw t;
-    }
-    finally {
-      // make sure other vm was notified even if test failed
-      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
-        public void run() {
-          synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
-            doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
-          }
-        }
-      });
-    }
-  }
-  protected static ControlListener doTestPartialMessage_Listener;
-  private void doTestPartialMessage() throws Exception {
-    final AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
-    factory.setEnableAsyncConflation(true);
-    final Region r = createRootRegion("slowrec", factory.create());
-    final DM dm = getSystem().getDistributionManager();
-    final DMStats stats = dm.getStats();
-    
-    // set others before vm0 connects
-//    final Set others = dm.getOtherDistributionManagerIds();
-    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
-//    int initialQueues = stats.getAsyncQueues();
-
-    // create receiver in vm0 with queuing enabled
-    final Properties p = new Properties();
-    p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4 sec
-    p.setProperty("async-queue-timeout", "86400000"); // max value
-    p.setProperty("async-max-queue-size", "1024"); // max value
-    
-    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
-      public void run2() throws CacheException {
-        getSystem(p);
-        AttributesFactory af = new AttributesFactory();
-        af.setScope(Scope.DISTRIBUTED_NO_ACK);
-        af.setDataPolicy(DataPolicy.REPLICATE);
-        
-        doTestPartialMessage_Listener = new ControlListener();
-        af.setCacheListener(doTestPartialMessage_Listener);
-        createRootRegion("slowrec", af.create());
-      }
-    });
-
-    // put vm0 cache listener into wait
-    getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
-    final int millisToWait = 1000 * 60 * 5; // 5 minutes
-    r.put(KEY_WAIT, new Integer(millisToWait));
-
-    // build up queue size
-    getLogWriter().info("[testPartialMessage] building up queue size...");
-    final Object key = "key";
-    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
-    final int VALUE_SIZE = socketBufferSize*3;
-    //1024 * 20; // 20 KB
-    final byte[] value = new byte[VALUE_SIZE];
-
-    int count = 0;
-    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
-      count++;
-      r.put(key, value, new Integer(count));
-    }
-    
-    final int partialId = count;
-    assertEquals(0, stats.getAsyncConflatedMsgs());
-    
-    getLogWriter().info("[testPartialMessage] After " + 
-      count + " puts of size " + VALUE_SIZE + 
-      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
-
-    pause(2000);
-      
-    // conflate 10 times
-    while (stats.getAsyncConflatedMsgs() < 10) {
-      count++;
-      r.put(key, value, new Integer(count));
-      if (count == partialId+1) {
-//        long begin = System.currentTimeMillis();
-//        while (stats.getAsyncQueues() < 1) {
-//          pause(100);
-//          assertFalse(System.currentTimeMillis() > begin+1000*10);
-//        }
-        assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
-        assertEquals(0, stats.getAsyncConflatedMsgs());
-      } else if (count == partialId+2) {
-        assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
-        assertEquals(1, stats.getAsyncConflatedMsgs());
-      }
-    }
-    
-    final int conflateId = count;
-    
-    final int[] expectedArgs = { partialId, conflateId };
-
-    // send notify to vm0
-    getLogWriter().info("[testPartialMessage] wake up vm0");
-    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
-      public void run() {
-        synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
-          doTestPartialMessage_Listener.CONTROL_LOCK.notify();
-        }
-      }
-    });
-    
-    // wait for queue to be flushed
-    getLogWriter().info("[testPartialMessage] wait for vm0");
-    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
-      public void run() {
-        try {
-          synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
-            boolean done = false;
-            while (!done) {
-              if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
-                CallbackWrapper last = (CallbackWrapper)
-                  doTestPartialMessage_Listener.callbackArguments.getLast();
-                Integer lastId = (Integer) last.callbackArgument;
-                if (lastId.intValue() == conflateId) {
-                  done = true;
-                } else {
-                  doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
-                }
-              } else {
-                doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
-              }
-            }
-          }
-        } catch (InterruptedException ignore) {fail("interrupted");}
-      }
-    });
-    
-    // assert values on both listeners
-    getLogWriter().info("[testPartialMessage] assert callback arguments");
-    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
-      public void run() {
-        synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
-          getLogWriter().info("[testPartialMessage] " +
-              "doTestPartialMessage_Listener.callbackArguments=" + 
-              doTestPartialMessage_Listener.callbackArguments);
-              
-          assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
-                       doTestPartialMessage_Listener.callbackTypes.size());
-                       
-          int i = 0;
-          Iterator argIter = 
-            doTestPartialMessage_Listener.callbackArguments.iterator();
-          Iterator typeIter = 
-            doTestPartialMessage_Listener.callbackTypes.iterator();
-            
-          while (argIter.hasNext()) {
-            CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
-            Integer arg = (Integer) wrapper.callbackArgument;
-            typeIter.next(); // Integer type
-            if (arg.intValue() < partialId) {
-              continue;
-            }
-            assertEquals(new Integer(expectedArgs[i]), arg);
-            //assertEquals(CALLBACK_UPDATE_INTEGER, type);
-            i++;
-          }
-        }
-      }
-    });
-    
-  }
-}
-


[2/3] incubator-geode git commit: Remove Disabled from names of tests. Ensure each test has a Category and add Ignore to any test that is disabled due to being broken.

Posted by kl...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
new file mode 100644
index 0000000..e79eddc
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.util.*;
+import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.internal.tcp.Connection;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.*;
+
+import java.io.*;
+import java.util.*;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to make sure slow receiver queuing is working
+ *
+ * @author darrel
+ * @since 4.2.1
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class SlowRecDUnitTest extends CacheTestCase {
+
+  public SlowRecDUnitTest(String name) {
+    super(name);
+  }
+
+  // this test has special config of its distributed system so
+  // the setUp and tearDown methods need to make sure we don't
+  // use the ds from previous test and that we don't leave ours around
+  // for the next test to use.
+  
+  public void setUp() throws Exception {
+    try {
+      disconnectAllFromDS();
+    } finally {
+      super.setUp();
+    }
+  }
+  public void tearDown2() throws Exception {
+    try {
+      super.tearDown2();
+    } finally {
+      disconnectAllFromDS();
+    }
+  }
+  
+  //////////////////////  Test Methods  //////////////////////
+
+  private VM getOtherVm() {
+    Host host = Host.getHost(0);
+    return host.getVM(0);
+  }
+
+  static protected Object lastCallback = null;
+
+  private void doCreateOtherVm(final Properties p, final boolean addListener) {
+    VM vm = getOtherVm();
+    vm.invoke(new CacheSerializableRunnable("create root") {
+        public void run2() throws CacheException {
+          getSystem(p);
+          createAckRegion(true, false);
+          AttributesFactory af = new AttributesFactory();
+          af.setScope(Scope.DISTRIBUTED_NO_ACK);
+          af.setDataPolicy(DataPolicy.REPLICATE);
+          if (addListener) {
+            CacheListener cl = new CacheListenerAdapter() {
+                public void afterUpdate(EntryEvent event) {
+                  // make the slow receiver event slower!
+                  try {Thread.sleep(500);} catch (InterruptedException shuttingDown) {fail("interrupted");}
+                }
+              };
+            af.setCacheListener(cl);
+          } else {
+            CacheListener cl = new CacheListenerAdapter() {
+                public void afterCreate(EntryEvent event) {
+//                   getLogWriter().info("afterCreate " + event.getKey());
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                  if (event.getKey().equals("sleepkey")) {
+                    int sleepMs = ((Integer)event.getNewValue()).intValue();
+//                     getLogWriter().info("sleepkey sleeping for " + sleepMs);
+                    try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
+                  }
+                }
+                public void afterUpdate(EntryEvent event) {
+//                   getLogWriter().info("afterUpdate " + event.getKey());
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                  if (event.getKey().equals("sleepkey")) {
+                    int sleepMs = ((Integer)event.getNewValue()).intValue();
+//                     getLogWriter().info("sleepkey sleeping for " + sleepMs);
+                    try {Thread.sleep(sleepMs);} catch (InterruptedException ignore) {fail("interrupted");}
+                  }
+                }
+                public void afterInvalidate(EntryEvent event) {
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                }
+                public void afterDestroy(EntryEvent event) {
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                }
+              };
+            af.setCacheListener(cl);
+          }
+          Region r1 = createRootRegion("slowrec", af.create());
+          // place holder so we receive updates
+          r1.create("key", "value");
+        }
+      });
+  }
+  static protected final String CHECK_INVALID = "CHECK_INVALID";
+  
+  private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
+    VM vm = getOtherVm();
+    vm.invoke(new CacheSerializableRunnable("check last value") {
+        public void run2() throws CacheException {
+          Region r1 = getRootRegion("slowrec");
+          if (lcb != null) {
+            WaitCriterion ev = new WaitCriterion() {
+              public boolean done() {
+                return lcb.equals(lastCallback);
+              }
+              public String description() {
+                return "waiting for callback";
+              }
+            };
+            DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+            assertEquals(lcb, lastCallback);
+          }
+          if (lastValue == null) {
+            final Region r = r1;
+            WaitCriterion ev = new WaitCriterion() {
+              public boolean done() {
+                return r.getEntry("key") == null;
+              }
+              public String description() {
+                return "waiting for key to become null";
+              }
+            };
+            DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+            assertEquals(null, r1.getEntry("key"));
+          } else if (CHECK_INVALID.equals(lastValue)) {
+            // should be invalid
+            {
+              final Region r = r1;
+              WaitCriterion ev = new WaitCriterion() {
+                public boolean done() {
+                  Entry e = r.getEntry("key");
+                  if (e == null) {
+                    return false;
+                  }
+                  return e.getValue() == null;
+                }
+                public String description() {
+                  return "waiting for invalidate";
+                }
+              };
+              DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+//              assertNotNull(re);
+//              assertEquals(null, value);
+            }
+          } else {
+            {
+              int retryCount = 1000;
+              Region.Entry re = null;
+              Object value = null;
+              while (retryCount-- > 0) {
+                re = r1.getEntry("key");
+                if (re != null) {
+                  value = re.getValue();
+                  if (value != null && value.equals(lastValue)) {
+                    break;
+                  }
+                }
+                try {Thread.sleep(50);} catch (InterruptedException ignore) {fail("interrupted");}
+              }
+              assertNotNull(re);
+              assertNotNull(value);
+              assertEquals(lastValue, value);
+            }
+          }
+        }
+      });
+  }
+
+  private void forceQueueFlush() {
+    Connection.FORCE_ASYNC_QUEUE=false;
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return stats.getAsyncThreads() == 0;
+      }
+      public String description() {
+        return "Waiting for async threads to disappear";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
+  }
+  
+  private void forceQueuing(final Region r) throws CacheException {
+    Connection.FORCE_ASYNC_QUEUE=true;
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+    r.put("forcekey", "forcevalue");
+    
+    // wait for the flusher to get its first flush in progress
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return stats.getAsyncQueueFlushesInProgress() != 0;
+      }
+      public String description() {
+        return "waiting for flushes to start";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+  }
+  
+  /**
+   * Make sure that noack puts to a receiver
+   * will eventually queue and then catch up.
+   */
+  public void testNoAck() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "1");
+    doCreateOtherVm(p, false);
+
+    int repeatCount = 2;
+    int count = 0;
+    while (repeatCount-- > 0) {
+      forceQueuing(r);
+      final Object key = "key";
+      long queuedMsgs = stats.getAsyncQueuedMsgs();
+      long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//      long conflatedMsgs = stats.getAsyncConflatedMsgs();
+      long queueSize = stats.getAsyncQueueSize();
+      String lastValue = "";
+      final long intialQueuedMsgs = queuedMsgs;
+      long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
+      try {
+        // loop while we still have queued the initially queued msgs
+        // OR the cur # of queued msgs < 6
+        while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
+          String value = "count=" + count;
+          lastValue = value;
+          r.put(key, value);
+          count ++;
+          queueSize = stats.getAsyncQueueSize();
+          queuedMsgs = stats.getAsyncQueuedMsgs();
+          dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+          curQueuedMsgs = queuedMsgs - dequeuedMsgs;
+        }
+        getLogWriter().info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
+      } finally {
+        forceQueueFlush();
+      }
+      WaitCriterion ev = new WaitCriterion() {
+        public boolean done() {
+          return stats.getAsyncQueueSize() == 0;
+        }
+        public String description() {
+          return "Waiting for queues to empty";
+        }
+      };
+      final long start = System.currentTimeMillis();
+      DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+      final long finish = System.currentTimeMillis();
+      getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);
+    
+      checkLastValueInOtherVm(lastValue, null);
+    }
+  }
+  /**
+   * Create a region named AckRegion with ACK scope
+   */
+  protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    if (mirror) {
+      factory.setDataPolicy(DataPolicy.REPLICATE);
+    }
+    if (conflate) {
+      factory.setEnableAsyncConflation(true);
+    }
+    final Region r = createRootRegion("AckRegion", factory.create());
+    return r;
+  }
+  /**
+   * Make sure that noack puts to a receiver
+   * will eventually queue and then catch up with conflation
+   */
+  public void testNoAckConflation() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "1");
+    doCreateOtherVm(p, false);
+
+    forceQueuing(r);
+    final Object key = "key";
+    int count = 0;
+//    long queuedMsgs = stats.getAsyncQueuedMsgs();
+//    long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    long queueSize = stats.getAsyncQueueSize();
+    String lastValue = "";
+    final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
+    long start = 0;
+    try {
+      while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
+        String value = "count=" + count;
+        lastValue = value;
+        r.put(key, value);
+        count ++;
+        //       getLogWriter().info("After " + count + " "
+        //                           + " puts queueSize=" + queueSize
+        //                           + "    queuedMsgs=" + queuedMsgs
+        //                           + "  dequeuedMsgs=" + dequeuedMsgs
+        //                           + " conflatedMsgs=" + conflatedMsgs);
+      }
+      start = System.currentTimeMillis();
+    } finally {
+      forceQueueFlush();
+    }
+//     queueSize = stats.getAsyncQueueSize();
+//     queuedMsgs = stats.getAsyncQueuedMsgs();
+
+//     getLogWriter().info("After " + count + " "
+//                         + " puts slowrec mode kicked in by queuing "
+//                         + queuedMsgs + " for a total size of " + queueSize
+//                         + " conflatedMsgs=" + conflatedMsgs
+//                         + " dequeuedMsgs=" + dequeuedMsgs);
+//     final long start = System.currentTimeMillis();
+//     while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
+//       try {Thread.sleep(100);} catch (InterruptedException ignore) {}
+//       queueSize = stats.getAsyncQueueSize();
+//       queuedMsgs = stats.getAsyncQueuedMsgs();
+//       dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//       conflatedMsgs = stats.getAsyncConflatedMsgs();
+//       getLogWriter().info("After sleeping"
+//                           + "     queueSize=" + queueSize
+//                           + "    queuedMsgs=" + queuedMsgs
+//                           + "  dequeuedMsgs=" + dequeuedMsgs
+//                           + " conflatedMsgs=" + conflatedMsgs);
+    final long finish = System.currentTimeMillis();
+    getLogWriter().info("After " + (finish - start) + " ms async msgs where flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The lastValue was " + lastValue);
+    
+    checkLastValueInOtherVm(lastValue, null);
+  }
+  /**
+   * make sure ack does not hang
+   * make sure two ack updates do not conflate but are both queued
+   */
+  public void testAckConflation() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final Region ar = createAckRegion(false, true);
+    ar.create("ackKey", "ackValue");
+    
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "2");
+    doCreateOtherVm(p, false);
+
+    forceQueuing(r);
+    {
+      // make sure ack does not hang
+      // make sure two ack updates do not conflate but are both queued
+      long startQueuedMsgs = stats.getAsyncQueuedMsgs();
+      long startConflatedMsgs = stats.getAsyncConflatedMsgs();
+      Thread t = new Thread(new Runnable() {
+          public void run() {
+            ar.put("ackKey", "ackValue");
+          }
+        });
+      t.start();
+      Thread t2 = new Thread(new Runnable() {
+          public void run() {
+            ar.put("ackKey", "ackValue");
+          }
+        });
+      t2.start();
+      // give threads a chance to get queued
+      try {Thread.sleep(100);} catch (InterruptedException ignore) {fail("interrupted");}
+      forceQueueFlush();
+      DistributedTestCase.join(t, 2 * 1000, getLogWriter());
+      DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
+      long endQueuedMsgs = stats.getAsyncQueuedMsgs();
+      long endConflatedMsgs = stats.getAsyncConflatedMsgs();
+      assertEquals(startConflatedMsgs, endConflatedMsgs);
+      // queue should be flushed by the time we get an ack
+      assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
+      assertEquals(startQueuedMsgs+2, endQueuedMsgs);
+    }
+  }
+  /**
+   * Make sure that only sequences of updates are conflated
+   * Also checks that sending to a conflating region and non-conflating region
+   * does the correct thing.
+   * Test disabled because it intermittently fails due to race conditions
+   * in test. This has been fixed in congo's tests. See bug 35357.
+   */
+  public void _disabled_testConflationSequence() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    factory.setEnableAsyncConflation(false);
+    final Region noConflate = createRootRegion("noConflate", factory.create());
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "1");
+    doCreateOtherVm(p, false);
+    {
+      VM vm = getOtherVm();
+      vm.invoke(new CacheSerializableRunnable("create noConflate") {
+          public void run2() throws CacheException {
+            AttributesFactory af = new AttributesFactory();
+            af.setScope(Scope.DISTRIBUTED_NO_ACK);
+            af.setDataPolicy(DataPolicy.REPLICATE);
+            createRootRegion("noConflate", af.create());
+          }
+        });
+    }
+
+    // now make sure update+destroy does not conflate
+    final Object key = "key";      
+    getLogWriter().info("[testConflationSequence] about to force queuing");
+    forceQueuing(r);
+
+    int count = 0;
+    String value = "";
+    String lastValue = value;
+    Object mylcb = null;
+    long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//    long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    int endCount = count+60;
+
+    getLogWriter().info("[testConflationSequence] about to build up queue");
+    long begin = System.currentTimeMillis();
+    while (count < endCount) {
+      value = "count=" + count;
+      lastValue = value;
+      r.create(key, value);
+      count ++;
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      mylcb = value;
+      r.destroy(key, mylcb);
+      count ++;
+      lastValue = null;
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+    forceQueueFlush();
+    checkLastValueInOtherVm(lastValue, mylcb);
+
+    // now make sure create+update+localDestroy does not conflate
+    getLogWriter().info("[testConflationSequence] force queuing create-update-destroy");
+    forceQueuing(r);
+    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    endCount = count + 40;
+    
+    getLogWriter().info("[testConflationSequence] create-update-destroy");
+    begin = System.currentTimeMillis();
+    while (count < endCount) {
+      value = "count=" + count;
+      lastValue = value;
+      r.create(key, value);
+      count++;
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      r.localDestroy(key);
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+    forceQueueFlush();
+    checkLastValueInOtherVm(lastValue, null);
+
+    // now make sure update+invalidate does not conflate
+    getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
+    forceQueuing(r);
+    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    value = "count=" + count;
+    lastValue = value;
+    r.create(key, value);
+    count++;
+//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    endCount = count + 40;
+
+    getLogWriter().info("[testConflationSequence] update-invalidate");
+    begin = System.currentTimeMillis();
+    while (count < endCount) {
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      r.invalidate(key);
+      count ++;
+      lastValue = CHECK_INVALID;
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+    forceQueueFlush();
+    getLogWriter().info("[testConflationSequence] assert other vm");
+    checkLastValueInOtherVm(lastValue, null);
+
+    r.destroy(key);
+
+    // now make sure updates to a conflating region are conflated even while
+    // updates to a non-conflating are not.
+    getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
+    forceQueuing(r);
+    final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
+//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    
+    value = "count=" + count;
+    lastValue = value;
+    long conflatedMsgs = stats.getAsyncConflatedMsgs();
+    long queuedMsgs = stats.getAsyncQueuedMsgs();
+    r.create(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    r.put(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    noConflate.create(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    noConflate.put(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    count++;
+//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    endCount = count + 80;
+
+    begin = System.currentTimeMillis();
+    getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
+                        + " queuedMsgs=" + stats.getAsyncQueuedMsgs()
+                        + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
+                        + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
+                        + " asyncSocketWrites=" + stats.getAsyncSocketWrites()
+                        );
+    while (count < endCount) {
+      // make sure we continue to have a flush in progress
+      assertEquals(1, stats.getAsyncThreads());
+      assertEquals(1, stats.getAsyncQueues());
+      assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
+      // make sure we are not completing any flushing while this loop is in progress
+      assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      // make sure it was conflated and not queued
+      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+      conflatedMsgs++;
+      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+      noConflate.put(key, value);
+      // make sure it was queued and not conflated
+      queuedMsgs++;
+      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+
+    forceQueueFlush();
+    getLogWriter().info("[testConflationSequence] assert other vm");
+    checkLastValueInOtherVm(lastValue, null);
+  }
+  /**
+   * Make sure that exceeding the queue size limit causes a disconnect.
+   */
+  public void testSizeDisconnect() throws CacheException {
+    final String expected = 
+      "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
+      "||java.io.IOException: Broken pipe";
+    final String addExpected = 
+      "<ExpectedException action=add>" + expected + "</ExpectedException>";
+    final String removeExpected = 
+      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    // set others before vm0 connects
+    final Set others = dm.getOtherDistributionManagerIds();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-max-queue-size", "1"); // 1 meg
+    doCreateOtherVm(p, false);
+
+    
+    final Object key = "key";
+    final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give us 10 of these 100K msgs before queue full
+    final byte[] value = new byte[VALUE_SIZE];
+    int count = 0;
+    forceQueuing(r);
+    long queuedMsgs = stats.getAsyncQueuedMsgs();
+    long queueSize = stats.getAsyncQueueSize();
+    
+    getCache().getLogger().info(addExpected);
+    try {    
+      while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
+        r.put(key, value);
+        count ++;
+        if (stats.getAsyncQueueSize() > 0) {
+          queuedMsgs = stats.getAsyncQueuedMsgs();
+          queueSize = stats.getAsyncQueueSize();
+        }
+        if (count > 100) {
+          fail("should have exceeded max-queue-size by now");
+        }
+      }
+      getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
+      // make sure we lost a connection to vm0
+      WaitCriterion ev = new WaitCriterion() {
+        public boolean done() {
+          return dm.getOtherDistributionManagerIds().size() <= others.size()
+              && stats.getAsyncQueueSize() == 0;
+        }
+        public String description() {
+          return "waiting for connection loss";
+        }
+      };
+      DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+    }
+    finally {
+      forceQueueFlush();
+      getCache().getLogger().info(removeExpected);
+    }
+    assertEquals(others, dm.getOtherDistributionManagerIds());
+    assertEquals(0, stats.getAsyncQueueSize());
+  }
+  /**
+   * Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
+   * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
+   * in build.xml in the splitbrainNov07 branch.  It had been disabled since
+   * June 2006 due to hangs.  Some of the tests, like this one, still need
+   * work because the periodically (some quite often) fail.
+   */
+  public void donottestTimeoutDisconnect() throws CacheException {
+    final String expected = 
+      "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced disconnect sent to" +
+      "||java.io.IOException: Broken pipe";
+    final String addExpected = 
+      "<ExpectedException action=add>" + expected + "</ExpectedException>";
+    final String removeExpected = 
+      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+      
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    // set others before vm0 connects
+    final Set others = dm.getOtherDistributionManagerIds();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-queue-timeout", "500"); // 500 ms
+    doCreateOtherVm(p, true);
+
+    
+    final Object key = "key";
+    final int VALUE_SIZE = 1024; // 1k
+    final byte[] value = new byte[VALUE_SIZE];
+    int count = 0;
+    long queuedMsgs = stats.getAsyncQueuedMsgs();
+    long queueSize = stats.getAsyncQueueSize();
+    final long timeoutLimit = System.currentTimeMillis() + 5000;
+
+    getCache().getLogger().info(addExpected);
+    try {    
+      while (stats.getAsyncQueueTimeouts() == 0) {
+        r.put(key, value);
+        count ++;
+        if (stats.getAsyncQueueSize() > 0) {
+          queuedMsgs = stats.getAsyncQueuedMsgs();
+          queueSize = stats.getAsyncQueueSize();
+        }
+        if (System.currentTimeMillis() > timeoutLimit) {
+          fail("should have exceeded async-queue-timeout by now");
+        }
+      }
+      getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts slowrec mode kicked in but the queue filled when its size reached " + queueSize + " with " + queuedMsgs + " msgs");
+      // make sure we lost a connection to vm0
+      WaitCriterion ev = new WaitCriterion() {
+        public boolean done() {
+          if (dm.getOtherDistributionManagerIds().size() > others.size()) {
+            return false;
+          }
+          return stats.getAsyncQueueSize() == 0;
+        }
+        public String description() {
+          return "waiting for departure";
+        }
+      };
+      DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+    }
+    finally {
+      getCache().getLogger().info(removeExpected);
+    }
+    assertEquals(others, dm.getOtherDistributionManagerIds());
+    assertEquals(0, stats.getAsyncQueueSize());
+  }
+
+  // static helper methods ---------------------------------------------------
+  
+  private static final String KEY_SLEEP = "KEY_SLEEP";
+  private static final String KEY_WAIT = "KEY_WAIT";
+  private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
+  
+  protected final static int CALLBACK_CREATE = 0;
+  protected final static int CALLBACK_UPDATE = 1;
+  protected final static int CALLBACK_INVALIDATE = 2;
+  protected final static int CALLBACK_DESTROY = 3;
+  protected final static int CALLBACK_REGION_INVALIDATE = 4;
+  
+  protected final static Integer CALLBACK_CREATE_INTEGER = new Integer(CALLBACK_CREATE);
+  protected final static Integer CALLBACK_UPDATE_INTEGER = new Integer(CALLBACK_UPDATE);
+  protected final static Integer CALLBACK_INVALIDATE_INTEGER = new Integer(CALLBACK_INVALIDATE);
+  protected final static Integer CALLBACK_DESTROY_INTEGER = new Integer(CALLBACK_DESTROY);
+  protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new Integer(CALLBACK_REGION_INVALIDATE);
+
+  private static class CallbackWrapper {
+    public final Object callbackArgument;
+    public final  int callbackType;
+    public CallbackWrapper(Object callbackArgument, int callbackType) {
+      this.callbackArgument = callbackArgument;
+      this.callbackType = callbackType;
+    }
+    public String toString() {
+      return "CallbackWrapper: " + callbackArgument.toString() + " of type " + callbackType;
+    }
+  }
+  
+  protected static class ControlListener extends CacheListenerAdapter {
+    public final LinkedList callbackArguments = new LinkedList();
+    public final LinkedList callbackTypes = new LinkedList();
+    public final Object CONTROL_LOCK = new Object();
+    
+    public void afterCreate(EntryEvent event) {
+      getLogWriter().info(event.getRegion().getName() + " afterCreate " + event.getKey());
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
+          this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+      processEvent(event);
+    }
+    public void afterUpdate(EntryEvent event) {
+      getLogWriter().info(event.getRegion().getName() + " afterUpdate " + event.getKey());
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
+          this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+      processEvent(event);
+    }
+    public void afterInvalidate(EntryEvent event) {
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_INVALIDATE));
+          this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+    }
+    public void afterDestroy(EntryEvent event) {
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_DESTROY));
+          this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+    }
+    public void afterRegionInvalidate(RegionEvent event) {
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_REGION_INVALIDATE));
+          this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+    }
+    private void processEvent(EntryEvent event) {
+      if (event.getKey().equals(KEY_SLEEP)) {
+        processSleep(event);
+      }
+      else if (event.getKey().equals(KEY_WAIT)) {
+        processWait(event);
+      }
+      else if (event.getKey().equals(KEY_DISCONNECT)) {
+        processDisconnect(event);
+      }
+    }
+    private void processSleep(EntryEvent event) {
+      int sleepMs = ((Integer)event.getNewValue()).intValue();
+      getLogWriter().info("[processSleep] sleeping for " + sleepMs);
+      try {
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException ignore) {fail("interrupted");}
+    }
+    private void processWait(EntryEvent event) {
+      int sleepMs = ((Integer)event.getNewValue()).intValue();
+      getLogWriter().info("[processWait] waiting for " + sleepMs);
+      synchronized(this.CONTROL_LOCK) {
+        try {
+          this.CONTROL_LOCK.wait(sleepMs);
+        } catch (InterruptedException ignore) {return;}
+      }
+    }
+    private void processDisconnect(EntryEvent event) {
+      getLogWriter().info("[processDisconnect] disconnecting");
+      disconnectFromDS();
+    }
+  };
+
+  /**
+   * Make sure a multiple no ack regions conflate properly.
+   * [bruce] disabled when use of this dunit test class was reenabled in
+   * the splitbrainNov07 branch.  The class had been disabled since
+   * June 2006 r13222 in the trunk.  This test is failing because conflation
+   * isn't kicking in for some reason.
+   */
+  public void donottestMultipleRegionConflation() throws Throwable {
+    try {
+      doTestMultipleRegionConflation();
+    }
+    catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      getLogWriter().error("Encountered exception: ", t);
+      throw t;
+    }
+    finally {
+      // make sure other vm was notified even if test failed
+      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+        public void run() {
+          synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+            doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
+          }
+        }
+      });
+    }
+  }
+  protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
+  protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
+  private void doTestMultipleRegionConflation() throws Exception {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r1 = createRootRegion("slowrec1", factory.create());
+    final Region r2 = createRootRegion("slowrec2", factory.create());
+    
+    assertTrue(getSystem().isConnected());
+    assertNotNull(r1);
+    assertFalse(r1.isDestroyed());
+    assertNotNull(getCache());
+    assertNotNull(getCache().getRegion("slowrec1"));
+    assertNotNull(r2);
+    assertFalse(r2.isDestroyed());
+    assertNotNull(getCache());
+    assertNotNull(getCache().getRegion("slowrec2"));
+    
+    final DM dm = getSystem().getDistributionManager();
+    final Serializable controllerVM = dm.getDistributionManagerId();
+    final DMStats stats = dm.getStats();
+    final int millisToWait = 1000 * 60 * 5; // 5 minutes
+    
+    // set others before vm0 connects
+    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+
+    // create receiver in vm0 with queuing enabled
+    final Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-queue-timeout", "86400000"); // max value
+    p.setProperty("async-max-queue-size", "1024"); // max value
+
+    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+      public void run2() throws CacheException {
+        getSystem(p);
+        
+        DM dm = getSystem().getDistributionManager();
+        assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
+        
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_NO_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        
+        doTestMultipleRegionConflation_R1_Listener = new ControlListener();
+        af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
+        createRootRegion("slowrec1", af.create());
+        
+        doTestMultipleRegionConflation_R2_Listener = new ControlListener();
+        af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
+        createRootRegion("slowrec2", af.create());
+      }
+    });
+    
+    // put vm0 cache listener into wait
+    getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 into wait");
+    r1.put(KEY_WAIT, new Integer(millisToWait));
+
+    // build up queue size
+    getLogWriter().info("[doTestMultipleRegionConflation] building up queue size...");
+    final Object key = "key";
+    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+    final int VALUE_SIZE = socketBufferSize*3;
+    //final int VALUE_SIZE = 1024 * 1024  ; // 1 MB
+    final byte[] value = new byte[VALUE_SIZE];
+
+    int count = 0;
+    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+      count++;
+      r1.put(key, value);
+    }
+    
+    getLogWriter().info("[doTestMultipleRegionConflation] After " + 
+      count + " puts of size " + VALUE_SIZE + 
+      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+    // put values that will be asserted
+    final Object key1 = "key1";
+    final Object key2 = "key2";
+    Object putKey = key1;
+    boolean flag = true;
+    for (int i = 0; i < 30; i++) {
+      if (i == 10) putKey = key2;
+      if (flag) {
+        if (i == 6) {
+          r1.invalidate(putKey, new Integer(i));
+        } else if (i == 24) {
+          r1.invalidateRegion(new Integer(i));
+        } else {
+          r1.put(putKey, value, new Integer(i));
+        }
+      } else {
+        if (i == 15) {
+          r2.destroy(putKey, new Integer(i));
+        } else {
+          r2.put(putKey, value, new Integer(i));
+        }
+      }
+      flag = !flag;
+    }
+    
+    // r1: key1, 0, create
+    // r1: key1, 4, update
+    // r1: key1, 6, invalidate
+    // r1: key1, 8, update
+    
+    // r1: key2, 10, create
+    // r1:       24, invalidateRegion
+    // r1: key2, 28, update
+
+    // r2: key1, 1, create
+    // r2: key1, 9, update
+    
+    // r2: key2, 11, create
+    // r2: key2, 13, update
+    // r2: key2, 15, destroy
+    // r2: key2, 17, create
+    // r2: key2, 29, update
+    
+    final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 }; 
+    final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
+      { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
+        CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE }; 
+    
+    final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
+    final int[] r2ExpectedTypes = new int[] 
+      { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
+        CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE }; 
+
+    // send notify to vm0
+    getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+      public void run() {
+        synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+          doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
+        }
+      }
+    });
+    
+    // wait for queue to be flushed
+    getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
+      public void run() {
+        try {
+          synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+            while (doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < r1ExpectedArgs.length) {
+              doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
+            }
+          }
+          synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
+            while (doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < r2ExpectedArgs.length) {
+              doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
+            }
+          }
+        } catch (InterruptedException ignore) {fail("interrupted");}
+      }
+    });
+    
+    // assert values on both listeners
+    getLogWriter().info("[doTestMultipleRegionConflation] assert callback arguments");
+    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
+      public void run() {
+        synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+          getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments=" + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
+          getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" + doTestMultipleRegionConflation_R1_Listener.callbackTypes);
+          assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
+                       doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
+          int i = 0;
+          for (Iterator iter = doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); iter.hasNext();) {
+            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
+            assertEquals(new Integer(r1ExpectedArgs[i]), 
+              wrapper.callbackArgument);
+            assertEquals(new Integer(r1ExpectedTypes[i]), 
+              doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
+            i++;
+          }
+        }
+        synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
+          getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments=" + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
+          getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" + doTestMultipleRegionConflation_R2_Listener.callbackTypes);
+          assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
+                       doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
+          int i = 0;
+          for (Iterator iter = doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); iter.hasNext();) {
+            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
+            assertEquals(new Integer(r2ExpectedArgs[i]), 
+              wrapper.callbackArgument);
+            assertEquals(new Integer(r2ExpectedTypes[i]), 
+              doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
+            i++;
+          }
+        }
+      }
+    });
+  }
+
+  /**
+   * Make sure a disconnect causes queue memory to be released.
+   */
+  public void testDisconnectCleanup() throws Throwable {
+    try {
+      doTestDisconnectCleanup();
+    }
+    catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      getLogWriter().error("Encountered exception: ", t);
+      throw t;
+    }
+    finally {
+      // make sure other vm was notified even if test failed
+      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+        public void run() {
+          synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
+            doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
+          }
+        }
+      });
+    }
+  }
+  protected static ControlListener doTestDisconnectCleanup_Listener;
+  private void doTestDisconnectCleanup() throws Exception {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    // set others before vm0 connects
+    final Set others = dm.getOtherDistributionManagerIds();
+    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+    final int initialQueues = stats.getAsyncQueues();
+
+    // create receiver in vm0 with queuing enabled
+    final Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-queue-timeout", "86400000"); // max value
+    p.setProperty("async-max-queue-size", "1024"); // max value
+    
+    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+      public void run2() throws CacheException {
+        getSystem(p);
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_NO_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        
+        doTestDisconnectCleanup_Listener = new ControlListener();
+        af.setCacheListener(doTestDisconnectCleanup_Listener);
+        createRootRegion("slowrec", af.create());
+      }
+    });
+
+    // put vm0 cache listener into wait
+    getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
+    int millisToWait = 1000 * 60 * 5; // 5 minutes
+    r.put(KEY_WAIT, new Integer(millisToWait));
+    r.put(KEY_DISCONNECT, KEY_DISCONNECT);
+
+    // build up queue size
+    getLogWriter().info("[testDisconnectCleanup] building up queue size...");
+    final Object key = "key";
+    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+    final int VALUE_SIZE = socketBufferSize*3;
+    //final int VALUE_SIZE = 1024 * 1024  ; // 1 MB
+    final byte[] value = new byte[VALUE_SIZE];
+
+    int count = 0;
+    final long abortMillis = System.currentTimeMillis() + millisToWait;
+    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+      count++;
+      r.put(key, value);
+      assertFalse(System.currentTimeMillis() >= abortMillis);
+    }
+    
+    getLogWriter().info("[testDisconnectCleanup] After " + 
+      count + " puts of size " + VALUE_SIZE + 
+      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+    while (stats.getAsyncQueuedMsgs() < 10 ||
+           stats.getAsyncQueueSize() < VALUE_SIZE*10) {
+      count++;
+      r.put(key, value);
+      assertFalse(System.currentTimeMillis() >= abortMillis);
+    }
+    assertTrue(stats.getAsyncQueuedMsgs() >= 10);
+
+    while (stats.getAsyncQueues() < 1) {
+      pause(100);
+      assertFalse(System.currentTimeMillis() >= abortMillis);
+    }
+    
+    getLogWriter().info("[testDisconnectCleanup] After " + 
+      count + " puts of size " + VALUE_SIZE + " queue size has reached " + 
+      stats.getAsyncQueueSize() + " bytes and number of queues is " + 
+      stats.getAsyncQueues() + ".");
+
+    assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
+    assertEquals(initialQueues+1, stats.getAsyncQueues());
+
+    // assert vm0 is still connected
+    assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
+    
+    // send notify to vm0
+    getLogWriter().info("[testDisconnectCleanup] wake up vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+      public void run() {
+        synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
+          doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
+        }
+      }
+    });
+    
+    // make sure we lost a connection to vm0
+    getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return dm.getOtherDistributionManagerIds().size() <= others.size();
+      }
+      public String description() {
+        return "waiting for disconnect";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+    assertEquals(others, dm.getOtherDistributionManagerIds());
+    
+    // check free memory... perform wait loop with System.gc
+    getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
+    ev = new WaitCriterion() {
+      public boolean done() {
+        if (stats.getAsyncQueues() <= initialQueues) {
+          return true;
+        }
+        Runtime.getRuntime().gc();
+        return false;
+      }
+      public String description() {
+        return "waiting for queue cleanup";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+//    getLogWriter().info("[testDisconnectCleanup] initialQueues=" + 
+//      initialQueues + " asyncQueues=" + stats.getAsyncQueues());
+    assertEquals(initialQueues, stats.getAsyncQueues());
+  }
+
+  /**
+   * Make sure a disconnect causes queue memory to be released.<p>
+     * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
+   * in build.xml in the splitbrainNov07 branch.  It had been disabled since
+   * June 2006 due to hangs.  Some of the tests, like this one, still need
+   * work because the periodically (some quite often) fail.
+ */
+  public void donottestPartialMessage() throws Throwable {
+    try {
+      doTestPartialMessage();
+    }
+    catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      getLogWriter().error("Encountered exception: ", t);
+      throw t;
+    }
+    finally {
+      // make sure other vm was notified even if test failed
+      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+        public void run() {
+          synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+            doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
+          }
+        }
+      });
+    }
+  }
+  protected static ControlListener doTestPartialMessage_Listener;
+  private void doTestPartialMessage() throws Exception {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    
+    // set others before vm0 connects
+//    final Set others = dm.getOtherDistributionManagerIds();
+    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+//    int initialQueues = stats.getAsyncQueues();
+
+    // create receiver in vm0 with queuing enabled
+    final Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4 sec
+    p.setProperty("async-queue-timeout", "86400000"); // max value
+    p.setProperty("async-max-queue-size", "1024"); // max value
+    
+    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+      public void run2() throws CacheException {
+        getSystem(p);
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_NO_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        
+        doTestPartialMessage_Listener = new ControlListener();
+        af.setCacheListener(doTestPartialMessage_Listener);
+        createRootRegion("slowrec", af.create());
+      }
+    });
+
+    // put vm0 cache listener into wait
+    getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
+    final int millisToWait = 1000 * 60 * 5; // 5 minutes
+    r.put(KEY_WAIT, new Integer(millisToWait));
+
+    // build up queue size
+    getLogWriter().info("[testPartialMessage] building up queue size...");
+    final Object key = "key";
+    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+    final int VALUE_SIZE = socketBufferSize*3;
+    //1024 * 20; // 20 KB
+    final byte[] value = new byte[VALUE_SIZE];
+
+    int count = 0;
+    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+      count++;
+      r.put(key, value, new Integer(count));
+    }
+    
+    final int partialId = count;
+    assertEquals(0, stats.getAsyncConflatedMsgs());
+    
+    getLogWriter().info("[testPartialMessage] After " + 
+      count + " puts of size " + VALUE_SIZE + 
+      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+    pause(2000);
+      
+    // conflate 10 times
+    while (stats.getAsyncConflatedMsgs() < 10) {
+      count++;
+      r.put(key, value, new Integer(count));
+      if (count == partialId+1) {
+//        long begin = System.currentTimeMillis();
+//        while (stats.getAsyncQueues() < 1) {
+//          pause(100);
+//          assertFalse(System.currentTimeMillis() > begin+1000*10);
+//        }
+        assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
+        assertEquals(0, stats.getAsyncConflatedMsgs());
+      } else if (count == partialId+2) {
+        assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
+        assertEquals(1, stats.getAsyncConflatedMsgs());
+      }
+    }
+    
+    final int conflateId = count;
+    
+    final int[] expectedArgs = { partialId, conflateId };
+
+    // send notify to vm0
+    getLogWriter().info("[testPartialMessage] wake up vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+      public void run() {
+        synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+          doTestPartialMessage_Listener.CONTROL_LOCK.notify();
+        }
+      }
+    });
+    
+    // wait for queue to be flushed
+    getLogWriter().info("[testPartialMessage] wait for vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
+      public void run() {
+        try {
+          synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+            boolean done = false;
+            while (!done) {
+              if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
+                CallbackWrapper last = (CallbackWrapper)
+                  doTestPartialMessage_Listener.callbackArguments.getLast();
+                Integer lastId = (Integer) last.callbackArgument;
+                if (lastId.intValue() == conflateId) {
+                  done = true;
+                } else {
+                  doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
+                }
+              } else {
+                doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
+              }
+            }
+          }
+        } catch (InterruptedException ignore) {fail("interrupted");}
+      }
+    });
+    
+    // assert values on both listeners
+    getLogWriter().info("[testPartialMessage] assert callback arguments");
+    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
+      public void run() {
+        synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+          getLogWriter().info("[testPartialMessage] " +
+              "doTestPartialMessage_Listener.callbackArguments=" + 
+              doTestPartialMessage_Listener.callbackArguments);
+              
+          assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
+                       doTestPartialMessage_Listener.callbackTypes.size());
+                       
+          int i = 0;
+          Iterator argIter = 
+            doTestPartialMessage_Listener.callbackArguments.iterator();
+          Iterator typeIter = 
+            doTestPartialMessage_Listener.callbackTypes.iterator();
+            
+          while (argIter.hasNext()) {
+            CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
+            Integer arg = (Integer) wrapper.callbackArgument;
+            typeIter.next(); // Integer type
+            if (arg.intValue() < partialId) {
+              continue;
+            }
+            assertEquals(new Integer(expectedArgs[i]), arg);
+            //assertEquals(CALLBACK_UPDATE_INTEGER, type);
+            i++;
+          }
+        }
+      }
+    });
+    
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
index 41b3042..55a63ff 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
@@ -24,7 +24,11 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.junit.experimental.categories.Category;
+
 import com.gemstone.gemfire.distributed.internal.membership.*;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
 import junit.framework.AssertionFailedError;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java
new file mode 100755
index 0000000..e07e9fc
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDUnitTest.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.distributed;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class KirkDUnitTest {
+
+  @Test
+  public void testMe() {
+    Assert.assertTrue(true);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java
new file mode 100755
index 0000000..8e267db
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkDistributedTestSuite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+   //DistributedMemberDUnitTest.class,
+   KirkDUnitTest.class,
+})
+/**
+ * Suite of tests for distributed membership dunit tests.
+ */
+public class KirkDistributedTestSuite {
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java
new file mode 100755
index 0000000..f39fd05
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/KirkIntegrationTestSuite.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+   LauncherMemberMXBeanJUnitTest.class,
+})
+/**
+ * @author Kirk Lund
+ */
+public class KirkIntegrationTestSuite {
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java
deleted file mode 100644
index 67de3e3..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitDisabledTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.tcpserver;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Properties;
-import java.util.Vector;
-
-import junit.framework.Assert;
-
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.distributed.Locator;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.Version;
-
-import dunit.DistributedTestCase;
-import dunit.Host;
-import dunit.VM;
-
-/**
- * This tests the rolling upgrade for locators with
- * different GOSSIPVERSION.
- *
- * @author shobhit
- *
- */
-public class TcpServerBackwardCompatDUnitDisabledTest extends DistributedTestCase {
-
-  /**
-   * @param name
-   */
-  public TcpServerBackwardCompatDUnitDisabledTest(String name) {
-    super(name);
-  }
-
-  @Override
-  public void setUp() throws Exception {
-    super.setUp();
-    disconnectAllFromDS();
-    invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
-      
-      @Override
-      public void run2() throws CacheException {
-        TcpServer.isTesting = true;
-      }
-    });
-  }
-
-  @Override
-  public void tearDown2() throws Exception {
-    invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
-      
-      @Override
-      public void run2() throws CacheException {
-        TcpServer.isTesting = false;
-      }
-    });
-    super.tearDown2();
-  }
-
-  /**
-   * This test starts two locators with current GOSSIPVERSION
-   * and then shuts down one of them and restart it with new
-   * GOSSIPVERSION and verifies that it has recoverd the system
-   * View. Then we upgrade next locator.
-   */
-  public void testGossipVersionBackwardCompatibility() {
-    Host host = Host.getHost(0);
-    final VM locator0 = host.getVM(0);
-    final VM locator1 = host.getVM(1);
-    final VM locatorRestart0 = host.getVM(2);
-    final VM member = host.getVM(3);
-
-    // Create properties for locator0
-    final int port0 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final File logFile0 = new File(getUniqueName() + "-locator" + port0 + ".log");
-    
-    // Create properties for locator1
-    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    while (port == port0) {
-      port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    }
-    final int port1 = port;
-
-    final File logFile1 = new File(getUniqueName() + "-locator" + port1 + ".log");
-    
-    final String locators = host.getHostName() + "[" + port0 + "]," +
-                            host.getHostName() + "[" + port1 + "]";
-    
-    final Properties props = new Properties();
-    props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-    
-    // Start locator0 with props.
-    //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
-    locator0.invoke(new CacheSerializableRunnable("Starting first locator on port " + port0) {
-      
-      @Override
-      public void run2() throws CacheException {
-        try {
-          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
-          
-          Locator.startLocatorAndDS(port0, logFile0, props);
-        } catch (IOException e) {
-          fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
-        }
-      }
-    });
-
-    // Start a new member to add it to discovery set of locator0.
-    member.invoke(new CacheSerializableRunnable("Start a member") {
-      
-      @Override
-      public void run2() throws CacheException {
-        disconnectFromDS();
-        TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
-        InternalDistributedSystem.connect(props);
-      }
-    });
-
-    // Start locator1 with props.
-    //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port1+"]");
-    locator1.invoke(new CacheSerializableRunnable("Starting second locator on port " + port1) {
-
-      @Override
-      public void run2() throws CacheException {
-        try {
-          TcpServer.TESTVERSION -= 100;
-          TcpServer.OLDTESTVERSION -= 100;
-          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
-          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
-          assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
-          assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
-
-          Locator.startLocatorAndDS(port1, logFile1, props);
-
-          // Start a gossip client to connect to first locator "locator0".
-          fail("this test must be fixed to work with the jgroups replacement");
-          // TODO
-//          final GossipClient client = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1),  500);
-//          client.register("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), 5000, false);
-
-          WaitCriterion ev = new WaitCriterion() {
-            public boolean done() {
-              try {
-                // TODO
-//                Vector members = client.getMembers("mygroup1", 
-//                    new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
-//                return members.size() == 2;
-              }
-              catch (Exception e) {
-                e.printStackTrace();
-                fail("unexpected exception");
-              }
-              return false; // NOTREACHED
-            }
-            public String description() {
-              return null;
-            }
-          };
-          
-          DistributedTestCase.waitForCriterion(ev, 1000, 200, true);
-          fail("this test must be fixed to work with the jgroups replacement");
-          // TODO
-//          Vector members = client.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
-//          Assert.assertEquals(2, members.size());
-//          Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port0)));
-//          Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port1)));
-
-        } catch (IOException e) {
-          fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
-        }
-      }
-    });
-
-    // Stop first locator currently running in locator0 VM.
-    locator0.invoke(new CacheSerializableRunnable("Stopping first locator") {
-      
-      @Override
-      public void run2() throws CacheException {
-        Locator.getLocator().stop();
-        disconnectFromDS();
-      }
-    });
-    
-    // Restart first locator in new VM.
-    //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
-    locatorRestart0.invoke(new CacheSerializableRunnable("Restarting first locator on port " + port0) {
-      
-      @Override
-      public void run2() throws CacheException {
-        try {
-          TcpServer.TESTVERSION -= 100;
-          TcpServer.OLDTESTVERSION -= 100;
-          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
-          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
-          assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
-          assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
-
-          Locator.startLocatorAndDS(port0, logFile0, props);
-
-          // A new gossip client with new GOSSIPVERSION must be able
-          // to connect with new locator on port1, remote locator.
-          // Reuse locator0 VM.
-          fail("this test must be fixed to work with the jgroups replacement");
-          // TODO
-//          final GossipClient client2 = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1),  500);
-//          Vector<IpAddress> members = client2.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), true, 5000);
-//          Assert.assertEquals(2, members.size());
-          // As they are coming from other locator, their pid is of other locator process.
-//          getLogWriter().info(members.get(0) + " " + members.get(1));
-
-          // TODO
-//          for (IpAddress ipAddr : members) {
-//            int port = ipAddr.getPort();
-//            String hostname = ipAddr.getIpAddress().getHostAddress();
-//            int pid = ipAddr.getProcessId();
-//            Assert.assertTrue(" " + ipAddr, port == port0 || port == port1);
-//            Assert.assertTrue(" " + ipAddr, hostname.equals(InetAddress.getLocalHost().getHostAddress()));
-//            Assert.assertTrue(" " + ipAddr, pid == locator1.getPid());
-//          }
-
-        } catch (IOException e) {
-          fail("Locator0 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
-        }
-      }
-    });
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f077657/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java
new file mode 100644
index 0000000..2f5b80b
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServerBackwardCompatDUnitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed.internal.tcpserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Properties;
+import java.util.Vector;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+import junit.framework.Assert;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpServer;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * This tests the rolling upgrade for locators with
+ * different GOSSIPVERSION.
+ *
+ * @author shobhit
+ *
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class TcpServerBackwardCompatDUnitTest extends DistributedTestCase {
+
+  /**
+   * @param name
+   */
+  public TcpServerBackwardCompatDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    disconnectAllFromDS();
+    invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
+      
+      @Override
+      public void run2() throws CacheException {
+        TcpServer.isTesting = true;
+      }
+    });
+  }
+
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new CacheSerializableRunnable("Set TcpServer.isTesting true") {
+      
+      @Override
+      public void run2() throws CacheException {
+        TcpServer.isTesting = false;
+      }
+    });
+    super.tearDown2();
+  }
+
+  /**
+   * This test starts two locators with current GOSSIPVERSION
+   * and then shuts down one of them and restart it with new
+   * GOSSIPVERSION and verifies that it has recoverd the system
+   * View. Then we upgrade next locator.
+   */
+  public void testGossipVersionBackwardCompatibility() {
+    Host host = Host.getHost(0);
+    final VM locator0 = host.getVM(0);
+    final VM locator1 = host.getVM(1);
+    final VM locatorRestart0 = host.getVM(2);
+    final VM member = host.getVM(3);
+
+    // Create properties for locator0
+    final int port0 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final File logFile0 = new File(getUniqueName() + "-locator" + port0 + ".log");
+    
+    // Create properties for locator1
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    while (port == port0) {
+      port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    }
+    final int port1 = port;
+
+    final File logFile1 = new File(getUniqueName() + "-locator" + port1 + ".log");
+    
+    final String locators = host.getHostName() + "[" + port0 + "]," +
+                            host.getHostName() + "[" + port1 + "]";
+    
+    final Properties props = new Properties();
+    props.setProperty(DistributionConfig.LOCATORS_NAME, locators);
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+    
+    // Start locator0 with props.
+    //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
+    locator0.invoke(new CacheSerializableRunnable("Starting first locator on port " + port0) {
+      
+      @Override
+      public void run2() throws CacheException {
+        try {
+          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
+          
+          Locator.startLocatorAndDS(port0, logFile0, props);
+        } catch (IOException e) {
+          fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
+        }
+      }
+    });
+
+    // Start a new member to add it to discovery set of locator0.
+    member.invoke(new CacheSerializableRunnable("Start a member") {
+      
+      @Override
+      public void run2() throws CacheException {
+        disconnectFromDS();
+        TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION-100, Version.CURRENT_ORDINAL);
+        InternalDistributedSystem.connect(props);
+      }
+    });
+
+    // Start locator1 with props.
+    //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port1+"]");
+    locator1.invoke(new CacheSerializableRunnable("Starting second locator on port " + port1) {
+
+      @Override
+      public void run2() throws CacheException {
+        try {
+          TcpServer.TESTVERSION -= 100;
+          TcpServer.OLDTESTVERSION -= 100;
+          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
+          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
+          assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
+          assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
+
+          Locator.startLocatorAndDS(port1, logFile1, props);
+
+          // Start a gossip client to connect to first locator "locator0".
+          fail("this test must be fixed to work with the jgroups replacement");
+          // TODO
+//          final GossipClient client = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1),  500);
+//          client.register("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), 5000, false);
+
+          WaitCriterion ev = new WaitCriterion() {
+            public boolean done() {
+              try {
+                // TODO
+//                Vector members = client.getMembers("mygroup1", 
+//                    new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
+//                return members.size() == 2;
+              }
+              catch (Exception e) {
+                e.printStackTrace();
+                fail("unexpected exception");
+              }
+              return false; // NOTREACHED
+            }
+            public String description() {
+              return null;
+            }
+          };
+          
+          DistributedTestCase.waitForCriterion(ev, 1000, 200, true);
+          fail("this test must be fixed to work with the jgroups replacement");
+          // TODO
+//          Vector members = client.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port0), true, 5000);
+//          Assert.assertEquals(2, members.size());
+//          Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port0)));
+//          Assert.assertTrue(members.contains(new IpAddress(InetAddress.getLocalHost(), port1)));
+
+        } catch (IOException e) {
+          fail("Locator1 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
+        }
+      }
+    });
+
+    // Stop first locator currently running in locator0 VM.
+    locator0.invoke(new CacheSerializableRunnable("Stopping first locator") {
+      
+      @Override
+      public void run2() throws CacheException {
+        Locator.getLocator().stop();
+        disconnectFromDS();
+      }
+    });
+    
+    // Restart first locator in new VM.
+    //props.setProperty(DistributionConfig.START_LOCATOR_NAME, host.getHostName() + "["+port0+"]");
+    locatorRestart0.invoke(new CacheSerializableRunnable("Restarting first locator on port " + port0) {
+      
+      @Override
+      public void run2() throws CacheException {
+        try {
+          TcpServer.TESTVERSION -= 100;
+          TcpServer.OLDTESTVERSION -= 100;
+          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.TESTVERSION, Version.CURRENT_ORDINAL);
+          TcpServer.getGossipVersionMapForTestOnly().put(TcpServer.OLDTESTVERSION, Version.GFE_57.ordinal());
+          assertEquals("Gossip Version and Test version are not same", TcpServer.GOSSIPVERSION, TcpServer.TESTVERSION);
+          assertEquals("Previous Gossip Version and Test version are not same", TcpServer.OLDGOSSIPVERSION, TcpServer.OLDTESTVERSION);
+
+          Locator.startLocatorAndDS(port0, logFile0, props);
+
+          // A new gossip client with new GOSSIPVERSION must be able
+          // to connect with new locator on port1, remote locator.
+          // Reuse locator0 VM.
+          fail("this test must be fixed to work with the jgroups replacement");
+          // TODO
+//          final GossipClient client2 = new GossipClient(new IpAddress(InetAddress.getLocalHost(), port1),  500);
+//          Vector<IpAddress> members = client2.getMembers("mygroup1", new IpAddress(InetAddress.getLocalHost(), port1), true, 5000);
+//          Assert.assertEquals(2, members.size());
+          // As they are coming from other locator, their pid is of other locator process.
+//          getLogWriter().info(members.get(0) + " " + members.get(1));
+
+          // TODO
+//          for (IpAddress ipAddr : members) {
+//            int port = ipAddr.getPort();
+//            String hostname = ipAddr.getIpAddress().getHostAddress();
+//            int pid = ipAddr.getProcessId();
+//            Assert.assertTrue(" " + ipAddr, port == port0 || port == port1);
+//            Assert.assertTrue(" " + ipAddr, hostname.equals(InetAddress.getLocalHost().getHostAddress()));
+//            Assert.assertTrue(" " + ipAddr, pid == locator1.getPid());
+//          }
+
+        } catch (IOException e) {
+          fail("Locator0 start failed with Gossip Version: " + TcpServer.GOSSIPVERSION + "!", e);
+        }
+      }
+    });
+  }
+}