You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2016/01/20 03:22:29 UTC

[23/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
new file mode 100644
index 0000000..1a0007c
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
@@ -0,0 +1,227 @@
+package com.gemstone.gemfire.cache;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.CacheXml70DUnitTest;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
+
+public class CacheXml70GatewayDUnitTest extends CacheXmlTestCase {
+
+  public CacheXml70GatewayDUnitTest(String name) {
+    super(name);
+  }
+
+  protected String getGemFireVersion() {
+    return CacheXml.VERSION_7_0;
+  }
+  
+  /**
+   * Added to test the scenario of defect #50600.
+   */
+  public void testAsyncEventQueueWithGatewayEventFilter() {
+    getSystem();
+    CacheCreation cache = new CacheCreation();
+            
+    String id = "WBCLChannel";
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(100);
+    factory.setBatchTimeInterval(500);
+    factory.setBatchConflationEnabled(true);
+    factory.setMaximumQueueMemory(200);
+    factory.setDiskSynchronous(true);
+    factory.setParallel(false);
+    factory.setDispatcherThreads(33);
+    factory.addGatewayEventFilter(new MyGatewayEventFilter());
+            
+    AsyncEventListener eventListener = new CacheXml70DUnitTest.MyAsyncEventListener();
+    AsyncEventQueue asyncEventQueue = factory.create(id, eventListener);
+            
+    RegionAttributesCreation attrs = new RegionAttributesCreation();
+    attrs.addAsyncEventQueueId(asyncEventQueue.getId());
+    cache.createRegion("UserRegion", attrs);
+            
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+    
+    Set<AsyncEventQueue> asyncEventQueuesOnCache = c.getAsyncEventQueues();
+    assertTrue("Size of asyncEventQueues should be greater than 0", asyncEventQueuesOnCache.size() > 0);
+            
+    for (AsyncEventQueue asyncEventQueueOnCache : asyncEventQueuesOnCache) {
+      CacheXml70DUnitTest.validateAsyncEventQueue(asyncEventQueue, asyncEventQueueOnCache);
+    }
+  }
+  
+  public void testGatewayReceiver() throws CacheException{
+    getSystem();
+    CacheCreation cache = new CacheCreation();
+    
+    GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
+    gatewayReceiverFactory.setBindAddress("");
+    gatewayReceiverFactory.setStartPort(54321);
+    gatewayReceiverFactory.setEndPort(54331);
+    gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
+    gatewayReceiverFactory.setSocketBufferSize(1500);
+    GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+    gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter1);
+    GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+    gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter2);
+    GatewayReceiver receiver1 = gatewayReceiverFactory.create();
+    try {
+      receiver1.start();
+    }
+    catch (IOException e) {
+      fail("Could not start GatewayReceiver");
+    }
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+    Set<GatewayReceiver> receivers = c.getGatewayReceivers();
+    for(GatewayReceiver receiver : receivers){
+      validateGatewayReceiver(receiver1, receiver);
+    }
+  }
+  
+  public void testParallelGatewaySender() throws CacheException{
+    getSystem();
+    CacheCreation cache = new CacheCreation();
+    
+    GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
+    gatewaySenderFactory.setParallel(true);
+    gatewaySenderFactory.setDispatcherThreads(13);
+    gatewaySenderFactory.setManualStart(true);
+    gatewaySenderFactory.setSocketBufferSize(1234);
+    gatewaySenderFactory.setSocketReadTimeout(1050);          
+    gatewaySenderFactory.setBatchConflationEnabled(false);
+    gatewaySenderFactory.setBatchSize(88);
+    gatewaySenderFactory.setBatchTimeInterval(9);           
+    gatewaySenderFactory.setPersistenceEnabled(true);          
+    gatewaySenderFactory.setDiskStoreName("LNSender");  
+    gatewaySenderFactory.setDiskSynchronous(true);
+    gatewaySenderFactory.setMaximumQueueMemory(211);           
+    gatewaySenderFactory.setAlertThreshold(35);
+    
+    GatewayEventFilter myeventfilter1 = new MyGatewayEventFilter1();
+    gatewaySenderFactory.addGatewayEventFilter(myeventfilter1);
+    GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+    gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter1);
+    GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+    gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter2);
+    GatewaySender parallelGatewaySender = gatewaySenderFactory.create("LN", 2);
+    
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+    Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
+    for(GatewaySender sender : sendersOnCache){
+      assertEquals(true, sender.isParallel());
+      validateGatewaySender(parallelGatewaySender, sender);
+    }
+  }
+  
+  public void testSerialGatewaySender() throws CacheException{
+    getSystem();
+    CacheCreation cache = new CacheCreation();
+    GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
+    gatewaySenderFactory.setParallel(false);
+    gatewaySenderFactory.setManualStart(true);
+    gatewaySenderFactory.setSocketBufferSize(124);
+    gatewaySenderFactory.setSocketReadTimeout(1000);          
+    gatewaySenderFactory.setBatchConflationEnabled(false);
+    gatewaySenderFactory.setBatchSize(100);
+    gatewaySenderFactory.setBatchTimeInterval(10);           
+    gatewaySenderFactory.setPersistenceEnabled(true);          
+    gatewaySenderFactory.setDiskStoreName("LNSender"); 
+    gatewaySenderFactory.setDiskSynchronous(true);
+    gatewaySenderFactory.setMaximumQueueMemory(200);           
+    gatewaySenderFactory.setAlertThreshold(30);
+    
+    GatewayEventFilter myeventfilter1 = new MyGatewayEventFilter1();
+    gatewaySenderFactory.addGatewayEventFilter(myeventfilter1);
+    GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+    gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter1);
+    GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+    gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter2);
+    GatewaySender serialGatewaySender = gatewaySenderFactory.create("LN", 2);
+    
+    RegionAttributesCreation attrs = new RegionAttributesCreation();
+    attrs.addGatewaySenderId(serialGatewaySender.getId());
+    cache.createRegion("UserRegion", attrs);
+    
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+    Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
+    for(GatewaySender sender : sendersOnCache){
+      assertEquals(false, sender.isParallel());
+      validateGatewaySender(serialGatewaySender, sender);
+    }
+  }
+  
+  public static class MyGatewayEventFilter implements GatewayEventFilter, Declarable {
+    public void afterAcknowledgement(GatewayQueueEvent event) {
+    }
+    public boolean beforeEnqueue(GatewayQueueEvent event) {
+      return true;
+    }
+    public boolean beforeTransmit(GatewayQueueEvent event) {
+      return true;
+    }
+    public void close() {
+    }
+    public void init(Properties properties) {
+    }
+  }
+
+  static void validateGatewayReceiver(GatewayReceiver receiver1, GatewayReceiver gatewayReceiver) {
+    assertEquals(receiver1.getHost(), gatewayReceiver.getHost());
+    assertEquals(receiver1.getStartPort(), gatewayReceiver.getStartPort());
+    assertEquals(receiver1.getEndPort(), gatewayReceiver.getEndPort());
+    assertEquals(receiver1.getMaximumTimeBetweenPings(), gatewayReceiver.getMaximumTimeBetweenPings());
+    assertEquals(receiver1.getSocketBufferSize(), gatewayReceiver.getSocketBufferSize());
+    assertEquals(receiver1.getGatewayTransportFilters().size(), gatewayReceiver.getGatewayTransportFilters().size());
+  } 
+
+  static void validateGatewaySender(GatewaySender sender1, GatewaySender gatewaySender) {
+    assertEquals(sender1.getId(), gatewaySender.getId());
+    assertEquals(sender1.getRemoteDSId(), gatewaySender.getRemoteDSId());
+    assertEquals(sender1.isParallel(), gatewaySender.isParallel());
+    assertEquals(sender1.isBatchConflationEnabled(), gatewaySender.isBatchConflationEnabled());
+    assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
+    assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
+    assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
+    assertEquals(sender1.getDiskStoreName(),gatewaySender.getDiskStoreName());
+    assertEquals(sender1.isDiskSynchronous(),gatewaySender.isDiskSynchronous());
+    assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
+    assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
+    assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender.getGatewayEventFilters().size());
+    assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender.getGatewayTransportFilters().size());
+    
+    boolean isParallel = sender1.isParallel();
+    if (isParallel) {
+      assertTrue("sender should be instanceof Creation", sender1 instanceof ParallelGatewaySenderCreation);
+    } else {
+      assertTrue("sender should be instanceof Creation", sender1 instanceof SerialGatewaySenderCreation);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
new file mode 100644
index 0000000..54a84d6
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
@@ -0,0 +1,61 @@
+package com.gemstone.gemfire.cache;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+
+public class CacheXml80GatewayDUnitTest extends CacheXmlTestCase {
+
+  public CacheXml80GatewayDUnitTest(String name) {
+    super(name);
+  }
+
+  protected String getGemFireVersion() {
+    return CacheXml.VERSION_8_0;
+  }
+  
+  public void testGatewayReceiverWithManualStartTRUE() throws CacheException{
+    //getSystem();
+    CacheCreation cache = new CacheCreation();
+    
+    GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
+    gatewayReceiverFactory.setBindAddress("");
+    gatewayReceiverFactory.setStartPort(54321);
+    gatewayReceiverFactory.setEndPort(54331);
+    gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
+    gatewayReceiverFactory.setSocketBufferSize(1500);
+    gatewayReceiverFactory.setManualStart(true);
+    GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+    gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter1);
+    GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+    gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter2);
+    GatewayReceiver receiver1 = gatewayReceiverFactory.create();
+    try {
+      receiver1.start();
+    }
+    catch (IOException e) {
+      fail("Could not start GatewayReceiver");
+    }
+    testXml(cache);
+    Cache c = getCache();
+    assertNotNull(c);
+    Set<GatewayReceiver> receivers = c.getGatewayReceivers();
+    for(GatewayReceiver receiver : receivers){
+      validateGatewayReceiver(receiver1, receiver);
+    }
+  }
+
+  protected void validateGatewayReceiver(GatewayReceiver receiver1,
+      GatewayReceiver gatewayReceiver){
+    CacheXml70GatewayDUnitTest.validateGatewayReceiver(receiver1, gatewayReceiver);
+    assertEquals(receiver1.isManualStart(), gatewayReceiver.isManualStart());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
new file mode 100755
index 0000000..4ee46d1
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
@@ -0,0 +1,82 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.codeAnalysis;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.codeAnalysis.decode.CompiledClass;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import com.gemstone.gemfire.util.test.TestUtil;
+
+/**
+ * @author bruces
+ * 
+ */
+@Category(IntegrationTest.class)
+public class AnalyzeWANSerializablesJUnitTest extends AnalyzeSerializablesJUnitTest {
+  
+  @Before
+  public void loadClasses() throws Exception {
+    if (classes.size() > 0) {
+      return;
+    }
+    System.out.println("loadClasses starting");
+    List<String> excludedClasses = loadExcludedClasses(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "excludedClasses.txt")));
+    List<String> openBugs = loadOpenBugs(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "openBugs.txt")));
+    excludedClasses.addAll(openBugs);
+    
+    String cp = System.getProperty("java.class.path");
+    System.out.println("java classpath is " + cp);
+    System.out.flush();
+    String[] entries = cp.split(File.pathSeparator);
+    String buildDirName =
+         "gemfire-wan"+File.separatorChar
+        +"build"+File.separatorChar
+        +"classes"+File.separatorChar
+        +"main";
+    String buildDir = null;
+    
+    for (int i=0; i<entries.length  &&  buildDir==null; i++) {
+      System.out.println("examining '" + entries[i] + "'");
+      System.out.flush();
+      if (entries[i].endsWith(buildDirName)) {
+        buildDir = entries[i];
+      }
+    }
+    if (buildDir != null) {
+      System.out.println("loading class files from " + buildDir);
+      System.out.flush();
+      long start = System.currentTimeMillis();
+      loadClassesFromBuild(new File(buildDir), excludedClasses);
+      long finish = System.currentTimeMillis();
+      System.out.println("done loading " + classes.size() + " classes.  elapsed time = "
+          + (finish-start)/1000 + " seconds");
+    }
+    else {
+      fail("unable to find WAN classes");
+    }
+  }
+  
+  @AfterClass
+  public static void cleanup() {
+    if (classes != null) {
+      classes.clear();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
new file mode 100644
index 0000000..cb34278
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
@@ -0,0 +1,955 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.gopivotal.com/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.SerializableCallable;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * @author Shobhit Agarwal
+ * @since 7.0.1
+ */
+public class UpdateVersionDUnitTest extends DistributedTestCase {
+
+  protected static final String regionName = "testRegion";
+  protected static Cache cache;
+  private static Set<ExpectedException>expectedExceptions = new HashSet<ExpectedException>();
+
+  
+  
+  public UpdateVersionDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void tearDown2() throws Exception {
+    super.tearDown2();
+    closeCache();
+    invokeInEveryVM(new SerializableRunnable() { public void run() {
+      closeCache();
+     } });
+  }
+  
+  public void testUpdateVersionAfterCreateWithSerialSender() {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    final String key = "key-1";
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort});
+    vm0.invoke(UpdateVersionDUnitTest.class, "createSender", new Object[] { "ln1", 2, false, 10, 1, false, false, null, true });
+    
+    vm0.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "ln1", 1, 1});
+    vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+    vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+    vm2.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+    vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort});
+    vm3.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});    
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof PartitionedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(++entryVersion, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            Entry<?,?> entry = null;
+            try {
+              entry = region.getDataStore().getEntryLocally(0, key, false, false, false);
+            } catch (EntryNotFoundException e) {
+              // expected
+            } catch (ForceReattemptException e) {
+              // expected
+            } catch (PRLocallyDestroyedException e) {
+              throw new RuntimeException("unexpected exception", e);
+            }
+            if (entry != null) {
+              getLogWriter().info("found entry " + entry);
+            }
+            return (entry != null);
+          }
+
+          public String description() {
+            return "Expected "+key+" to be received on remote WAN site";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof EntrySnapshot);
+            RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    final String key = "key-1";
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort});
+    vm0.invoke(UpdateVersionDUnitTest.class, "createSender", new Object[] { "ln1", 2, false, 10, 1, false, false, null, true });
+    
+    vm0.invoke(UpdateVersionDUnitTest.class, "createReplicatedRegion", new Object[] {regionName, "ln1"});
+    vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+    vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+    vm2.invoke(UpdateVersionDUnitTest.class, "createReplicatedRegion", new Object[] {regionName, ""});
+    vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort });
+    vm3.invoke(UpdateVersionDUnitTest.class, "createReplicatedRegion", new Object[] {regionName, ""});    
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof DistributedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof NonTXEntry);
+        RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((DistributedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof NonTXEntry);
+        regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(entryVersion+1, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final Region region = cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            return (region.getEntry(key) != null);
+          }
+
+          public String description() {
+            return "Expected key-1 to be received on remote WAN site";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof NonTXEntry);
+            RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof NonTXEntry);
+        RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  public void testUpdateVersionAfterCreateWithParallelSender() {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    final String key = "key-1";
+
+    vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort});
+    vm0.invoke(UpdateVersionDUnitTest.class, "createSender", new Object[] { "ln1", 2, true, 10, 1, false, false, null, true });
+    
+    vm0.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "ln1", 1, 1});
+    vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+    vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+    vm2.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+    vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort});
+    vm3.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof PartitionedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(++entryVersion, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            Entry<?,?> entry = null;
+            try {
+              entry = region.getDataStore().getEntryLocally(0, key, false, false, false);
+            } catch (EntryNotFoundException e) {
+              // expected
+            } catch (ForceReattemptException e) {
+              // expected
+            } catch (PRLocallyDestroyedException e) {
+              throw new RuntimeException("unexpected exception", e);
+            }
+            if (entry != null) {
+              getLogWriter().info("found entry " + entry);
+            }
+            return (entry != null);
+          }
+
+          public String description() {
+            return "Expected key-1 to be received on remote WAN site";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof EntrySnapshot);
+            RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+
+  public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0); // server1 site1
+    VM vm1 = host.getVM(1); // server2 site1
+
+    VM vm2 = host.getVM(2); // server1 site2
+    VM vm3 = host.getVM(3); // server2 site2
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    final String key = "key-1";
+
+    vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort });
+    vm0.invoke(UpdateVersionDUnitTest.class, "createConcurrentSender", new Object[] { "ln1", 2, false, 10, 2, false, false, null, true, 2 });
+    
+    vm0.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "ln1", 1, 1});
+    vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+    vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+    vm2.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+    vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort });
+    vm3.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});    
+    
+    final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") {
+      
+      @Override
+      public Object call() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(regionName);
+        assertTrue(region instanceof PartitionedRegion);
+
+        region.put(key, "value-1");
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        // Create a duplicate entry version tag from stamp with newer
+        // time-stamp.
+        VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+        VersionTag tag = VersionTag.create(memberId);
+
+        int entryVersion = stamp.getEntryVersion()-1;
+        int dsid = stamp.getDistributedSystemId();
+        long time = System.currentTimeMillis();
+
+        tag.setEntryVersion(entryVersion);
+        tag.setDistributedSystemId(dsid);
+        tag.setVersionTimeStamp(time);
+        tag.setIsRemoteForTesting();
+
+        EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+            entry.getKey(), "value-2");
+
+        ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+        // Verify the new stamp
+        entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        stamp = regionEntry.getVersionStamp();
+        assertEquals(
+            "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+            time, stamp.getVersionTimeStamp());
+        assertEquals(++entryVersion, stamp.getEntryVersion());
+        assertEquals(dsid, stamp.getDistributedSystemId());
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+      
+      @Override
+      public Object call() throws Exception {
+        
+        Cache cache = CacheFactory.getAnyInstance();
+        final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+
+        // wait for entry to be received
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            Entry<?,?> entry = null;
+            try {
+              entry = region.getDataStore().getEntryLocally(0, key, false, false, false);
+            } catch (EntryNotFoundException e) {
+              // expected
+            } catch (ForceReattemptException e) {
+              // expected
+            } catch (PRLocallyDestroyedException e) {
+              throw new RuntimeException("unexpected exception", e);
+            }
+            if (entry != null) {
+              getLogWriter().info("found entry " + entry);
+            }
+            return (entry != null);
+          }
+
+          public String description() {
+            return "Expected key-1 to be received on remote WAN site";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        wc = new WaitCriterion() {
+          public boolean done() {
+            Entry entry = region.getEntry(key);
+            assertTrue(entry instanceof EntrySnapshot);
+            RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+            return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+          }
+          public String description() {
+            return "waiting for timestamp to be updated";
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+        Entry entry = region.getEntry(key);
+        assertTrue(entry instanceof EntrySnapshot);
+        RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+        VersionStamp stamp = regionEntry.getVersionStamp();
+
+        return stamp.asVersionTag();
+      }
+    });
+
+    assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+  }
+  
+  
+  private EntryEventImpl createNewEvent(LocalRegion region, VersionTag tag, Object key, Object value) {
+    EntryEventImpl updateEvent = EntryEventImpl.createVersionTagHolder(tag);
+    updateEvent.setOperation(Operation.UPDATE);
+    updateEvent.setRegion(region);
+    if (region instanceof PartitionedRegion) {
+      updateEvent.setKeyInfo(((PartitionedRegion)region).getKeyInfo(key));
+    } else {
+      updateEvent.setKeyInfo(new KeyInfo(key, value, null));
+    }
+    updateEvent.setNewValue(value);
+    updateEvent.setGenerateCallbacks(true);
+    updateEvent.distributedMember = region.getSystem().getDistributedMember();
+    updateEvent.setNewEventId(region.getSystem());
+    return updateEvent;
+  }
+
+  /*
+   * Helper Methods
+   */
+
+  private static void createCache(Integer locPort) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
+    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+    props.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds); 
+    ExpectedException ex = new ExpectedException("could not get remote locator information for remote site");
+    cache.getLogger().info(ex.getAddString());
+    expectedExceptions.add(ex);
+    ex = new ExpectedException("Pool ln1 is not available");
+    cache.getLogger().info(ex.getAddString());
+    expectedExceptions.add(ex);
+  }
+  
+  private static void closeCache() {
+    if (cache != null && !cache.isClosed()) {
+      for (ExpectedException expectedException: expectedExceptions) {
+        cache.getLogger().info(expectedException.getRemoveString());
+      }
+      expectedExceptions.clear();
+      cache.getDistributedSystem().disconnect();
+      cache.close();
+    }
+    cache = null;
+  }
+
+  public static void createSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+      boolean isManualStart) {
+    File persistentDirectory = new File(dsName + "_disk_"
+        + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File[] dirs1 = new File[] { persistentDirectory };
+    if (isParallel) {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory) gateway)
+          .setLocatorDiscoveryCallback(new MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      } else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+
+    } else {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory) gateway)
+          .setLocatorDiscoveryCallback(new MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      } else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  
+  public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(totalNumBuckets);
+    pfact.setRedundantCopies(redundantCopies);
+    pfact.setRecoveryDelay(0);
+    fact.setPartitionAttributes(pfact.create());
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+
+  public static void createReplicatedRegion(String regionName, String senderIds){
+    AttributesFactory fact = new AttributesFactory();
+    if(senderIds!= null){
+      StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+      while (tokenizer.hasMoreTokens()){
+        String senderId = tokenizer.nextToken();
+//        GatewaySender sender = cache.getGatewaySender(senderId);
+//        assertNotNull(sender);
+        fact.addGatewaySenderId(senderId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.REPLICATE);
+    fact.setScope(Scope.DISTRIBUTED_ACK);
+    Region r = cache.createRegionFactory(fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+
+  public static void waitForSenderRunningState(String senderId){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    final GatewaySender sender = getGatewaySenderById(senders, senderId);
+    
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (sender != null && sender.isRunning()) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected sender isRunning state to be true but is false";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 300000, 500, true);
+  }
+
+  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+    props.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static void createConcurrentSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      GatewayEventFilter filter, boolean isManulaStart, int concurrencyLevel) {
+    File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File [] dirs1 = new File[] {persistentDirectory};
+    
+    if(isParallel) {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManulaStart);
+      ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+      
+    }else {
+      GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManulaStart);
+      ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+      if (filter != null) {
+        gateway.addGatewayEventFilter(filter);
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if(isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setDispatcherThreads(concurrencyLevel);
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  public static int createReceiver(int locPort) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);    
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName() + " failed to start GatewayRecevier on port " + port);
+    }
+    return port;
+  }
+
+  public static void startSender(String senderId){
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        sender = s;
+        break;
+      }
+    }
+    sender.start();
+  }
+
+  protected static class MyLocatorCallback extends
+      LocatorDiscoveryCallbackAdapter {
+
+    private final Set discoveredLocators = new HashSet();
+
+    private final Set removedLocators = new HashSet();
+
+    public synchronized void locatorsDiscovered(List locators) {
+      discoveredLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public synchronized void locatorsRemoved(List locators) {
+      removedLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public boolean waitForDiscovery(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(discoveredLocators, locator, time);
+    }
+
+    public boolean waitForRemove(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(removedLocators, locator, time);
+    }
+
+    private synchronized boolean waitFor(Set set, InetSocketAddress locator,
+        long time) throws InterruptedException {
+      long remaining = time;
+      long endTime = System.currentTimeMillis() + time;
+      while (!set.contains(locator) && remaining >= 0) {
+        wait(remaining);
+        remaining = endTime - System.currentTimeMillis();
+      }
+      return set.contains(locator);
+    }
+
+    public synchronized Set getDiscovered() {
+      return new HashSet(discoveredLocators);
+    }
+
+    public synchronized Set getRemoved() {
+      return new HashSet(removedLocators);
+    }
+  }
+
+  private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
+    for(GatewaySender s : senders){
+      if(s.getId().equals(senderId)){
+        return s;
+      }
+    }
+    //if none of the senders matches with the supplied senderid, return null
+    return null;
+  }
+
+  public static Integer createFirstLocatorWithDSId(int dsId) {
+    UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+    props.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+}
\ No newline at end of file