You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/20 16:29:56 UTC

[25/49] incubator-geode git commit: GEODE-430: Fixing races in UpdatePropagationDUnitTest

GEODE-430: Fixing races in UpdatePropagationDUnitTest

This test was failing with a couple of different race conditions

1) It was not waiting for updates to make it to the second client,
causing assertion failures. I added a wait.

2) It was occasionally using a connection that was previously made to
the server before it was killed, causing an unexpected socket closed. I
refactored the test to not use internal APIs to do the put, but instead
verify that the event was not sent to the client using a listener.

3) In the PR version of the test, the PR single hop code can return
different addresses for the server, resulting in the pool thinking it
has two different servers when it only has one. I changed the wait to
wait for a server with a given port to go away.

I also did some cleanup of the test - extended JUnit4CacheTestCase,
removed the waits, used awaitility, removed an almost duplicate disabled
test method and added the one extra assertion from that duplicate test
to the enabled test method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/884cf13b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/884cf13b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/884cf13b

Branch: refs/heads/feature/GEODE-835-test
Commit: 884cf13b0ccace80d15ca11bcc3162f5169bdf73
Parents: c79b64f
Author: Dan Smith <up...@apache.org>
Authored: Wed May 11 13:04:57 2016 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Fri May 13 15:41:56 2016 -0700

----------------------------------------------------------------------
 .../sockets/UpdatePropagationDUnitTest.java     | 489 +++++--------------
 .../sockets/UpdatePropagationPRDUnitTest.java   |   4 +-
 2 files changed, 136 insertions(+), 357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/884cf13b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
index 69781c4..08d964a 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
@@ -16,44 +16,50 @@
  */
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-import org.junit.Ignore;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
 import com.gemstone.gemfire.cache.CacheWriterException;
 import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
 import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.EndpointManager;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
-import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
 import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.cache.EventID;
 import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.IgnoredException;
 import com.gemstone.gemfire.test.dunit.NetworkUtils;
 import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.core.ConditionTimeoutException;
 
 /**
  * Start client 1
@@ -71,7 +77,8 @@ import com.gemstone.gemfire.test.junit.categories.FlakyTest;
  * The key is to verify that the memberid being used by the client
  * to register with the server is the same across servers
  */
-public class UpdatePropagationDUnitTest extends DistributedTestCase {
+@Category(DistributedTest.class)
+public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
 
   VM server1 = null;
 
@@ -87,13 +94,6 @@ public class UpdatePropagationDUnitTest extends DistributedTestCase {
 
   private static final String REGION_NAME = "UpdatePropagationDUnitTest_region";
 
-  protected static Cache cache = null;
-  
-  /** constructor */
-  public UpdatePropagationDUnitTest(String name) {
-    super(name);
-  }
-
   @Override
   public final void postSetUp() throws Exception {
     disconnectAllFromDS();
@@ -111,382 +111,169 @@ public class UpdatePropagationDUnitTest extends DistributedTestCase {
     //client 2 VM
     client2 = host.getVM(3);
     
-    PORT1 =  ((Integer)server1.invoke(() -> createServerCache())).intValue();
-    PORT2 =  ((Integer)server2.invoke(() -> createServerCache())).intValue();
+    PORT1 =  server1.invoke(() -> createServerCache());
+    PORT2 =  server2.invoke(() -> createServerCache());
 
     client1.invoke(() -> createClientCache(
-      NetworkUtils.getServerHostName(server1.getHost()), new Integer(PORT1),new Integer(PORT2)));
+      NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
     client2.invoke(() -> createClientCache(
-      NetworkUtils.getServerHostName(server1.getHost()), new Integer(PORT1),new Integer(PORT2)));
+      NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2));
     
     IgnoredException.addIgnoredException("java.net.SocketException");
     IgnoredException.addIgnoredException("Unexpected IOException");
   }
 
-  private void createCache(Properties props) throws Exception
-  {
-    DistributedSystem ds = getSystem(props);
-    cache = CacheFactory.create(ds);
-    assertNotNull(cache);
-  }
-
-  static private final String WAIT_PROPERTY =
-    "UpdatePropagationDUnitTest.maxWaitTime";
-  static private final int WAIT_DEFAULT = 60000;
-
-
-  /**
-   * This tests whether the updates are received by the sender or not if
-   * there is an Interest List failover
-   *
-   */
-  @Ignore("Bug 50405")
-  public void DISABLED_testVerifyUpdatesNotReceivedBySender()
-  {
-    final int maxWaitTime = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT).intValue();
-
-    //First create entries on both servers via the two client
-    client1.invoke(() -> createEntriesK1andK2());
-    client2.invoke(() -> createEntriesK1andK2());
-    client1.invoke(() -> registerKeysK1andK2());
-    client2.invoke(() -> registerKeysK1andK2());
-    //Induce fail over of InteretsList Endpoint to Server 2 by killing server1
-    
-    server1.invoke(() -> UpdatePropagationDUnitTest.killServer(new Integer(PORT1)));
-    //Wait for 10 seconds to allow fail over. This would mean that Interest
-    // has failed over to Server2.
-    client1.invoke(new CacheSerializableRunnable("Wait for server on port1 to be dead") {
-      public void run2() throws CacheException
-      {
-        Region r = cache.getRegion(REGION_NAME);
-
-        try {
-          r.put("ping", "pong1"); // in the event there is no live server monitor thread
-        } catch (CacheWriterException itsOk) {}
-
-        String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() != 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
-      }
-    });
-
-    client2.invoke(new CacheSerializableRunnable("Wait for server on port1 to be dead") {
-      public void run2() throws CacheException
-      {
-        Region r = cache.getRegion(REGION_NAME);
-
-        try {
-          r.put("ping", "pong3"); // in the event there is no live server monitor thread
-        } catch (CacheWriterException itsOk) {}
-
-        String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() != 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
-      }
-    });
-
-    //Start Server1 again so that both clients1 & Client 2 will establish connection to server1 too.
-    server1.invoke(() -> UpdatePropagationDUnitTest.startServer(new Integer(PORT1)));
-
-    client1.invoke(new CacheSerializableRunnable("Wait for server on port1 to be dead") {
-      public void run2() throws CacheException
-      {
-        Region r = cache.getRegion(REGION_NAME);
-
-        try {
-          r.put("ping", "pong2"); // in the event there is no live server monitor thread
-        } catch (CacheWriterException itsOk) {}
-
-        String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() == 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
-      }
-    });
-
-    //Do a put on Server1 via Connection object from client1.
-    // Client1 should not receive updated value while client2 should receive
-    client1.invoke(() -> acquireConnectionsAndPutonK1andK2( NetworkUtils.getServerHostName(client1.getHost())));
-    //pause(5000);
-    //Check if both the puts ( on key1 & key2 ) have reached the servers
-    server1.invoke(() -> verifyUpdates());
-    server2.invoke(() -> verifyUpdates());
-    // verify no updates for update originator
-    client1.invoke(() -> verifyNoUpdates());
-
-  }
-
-
   /**
    * This tests whether the updates are received by other clients or not , if there are
    * situation of Interest List fail over
    */
-  @Category(FlakyTest.class) // GEODE-430: time sensitive, random ports, port reuse, thread sleeps (5 seconds), eats exceptions (fixed 1), async actions, waitForCriterion
-  public void testVerifyUpdatesReceivedByOtherClients() {
-    final int maxWaitTime = Integer.getInteger(WAIT_PROPERTY, WAIT_DEFAULT).intValue();
+  @Test
+  public void updatesAreProgegatedAfterFailover() {
     //  First create entries on both servers via the two client
     client1.invoke(() -> createEntriesK1andK2());
     client2.invoke(() -> createEntriesK1andK2());
     client1.invoke(() -> registerKeysK1andK2());
     client2.invoke(() -> registerKeysK1andK2());
     //Induce fail over of InteretsList Endpoint to Server 2 by killing server1
-    server1.invoke(() -> UpdatePropagationDUnitTest.killServer(new Integer(PORT1)));
+    server1.invoke(() -> killServer(new Integer(PORT1)));
     //Wait for 10 seconds to allow fail over. This would mean that Interstist has failed
     // over to Server2.
-    client1.invoke(new CacheSerializableRunnable("Wait for server on port1 to be dead") {
-      public void run2() throws CacheException
-      {
-        Region r = cache.getRegion(REGION_NAME);
-
-        try {
-          r.put("ping", "pong3"); // in the event there is no live server monitor thread
-        } catch (CacheWriterException itsOk) {}
-
-        String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() != 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
-      }
-    });
-    client2.invoke(new CacheSerializableRunnable("Wait for server on port1 to be dead") {
+    final CacheSerializableRunnable waitToDetectDeadServer = new CacheSerializableRunnable("Wait for server on port1 to be dead") {
       public void run2() throws CacheException
       {
-        Region r = cache.getRegion(REGION_NAME);
-
-        try {
-          r.put("ping", "pong3"); // in the event there is no live server monitor thread
-        } catch (CacheWriterException itsOk) {}
+        Region r = getCache().getRegion(REGION_NAME);
 
         String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() != 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
+        final PoolImpl pool = (PoolImpl) PoolManager.find(poolName);
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !hasEndPointWithPort(pool, PORT1));
       }
-    });
+    };
+    client1.invoke(waitToDetectDeadServer);
+    client2.invoke(waitToDetectDeadServer);
 
     //Start Server1 again so that both clients1 & Client 2 will establish connection to server1 too.
-    server1.invoke(() -> UpdatePropagationDUnitTest.startServer(new Integer(PORT1)));
-
-    client1.invoke(new CacheSerializableRunnable("Wait for servers to be alive") {
-      public void run2() throws CacheException
-      {
-        Region r = cache.getRegion(REGION_NAME);
-        String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() == 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
-      }
-    });
+    server1.invoke(() -> startServer(new Integer(PORT1)));
 
-    client2.invoke(new CacheSerializableRunnable("Wait for servers to be alive") {
+    final CacheSerializableRunnable waitToDetectLiveServer = new CacheSerializableRunnable("Wait for servers to be alive") {
       public void run2() throws CacheException
       {
-        Region r = cache.getRegion(REGION_NAME);
+        Region r = getCache().getRegion(REGION_NAME);
         String poolName = r.getAttributes().getPoolName();
-        assertNotNull(poolName);
-        final PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-        assertNotNull(pool);
-        WaitCriterion ev = new WaitCriterion() {
-          public boolean done() {
-            return pool.getConnectedServerCount() == 2;
-          }
-          public String description() {
-            return null;
-          }
-        };
-        Wait.waitForCriterion(ev, maxWaitTime, 200, true);
+        final PoolImpl pool = (PoolImpl) PoolManager.find(poolName);
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> hasEndPointWithPort(pool, PORT1));
       }
-    });
-    
-    Wait.pause(5000);
+    };
+    client1.invoke(waitToDetectLiveServer);
+    client2.invoke(waitToDetectLiveServer);
 
     //Do a put on Server1 via Connection object from client1.
     // Client1 should not receive updated value while client2 should receive
     client1.invoke(() -> acquireConnectionsAndPutonK1andK2( NetworkUtils.getServerHostName(client1.getHost())));
-    Wait.pause(5000);
     //Check if both the puts ( on key1 & key2 ) have reached the servers
     server1.invoke(() -> verifyUpdates());
     server2.invoke(() -> verifyUpdates());
     // verify updates to other client
     client2.invoke(() -> verifyUpdates());
+
+    // verify no updates for update originator
+    client1.invoke(() -> verifySenderUpdateCount());
+  }
+
+  /**
+   * Check to see if a client is connected to an endpoint with a specific port
+   */
+  private boolean hasEndPointWithPort(final PoolImpl pool, final int port) {
+    EndpointManager endpointManager = pool.getEndpointManager();
+    final Set<ServerLocation> servers = endpointManager
+      .getEndpointMap().keySet();
+    return servers.stream().anyMatch(location -> location.getPort() == port);
   }
 
   public void acquireConnectionsAndPutonK1andK2(String host)
   {
-    try {
-      Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-      assertNotNull(r1);
-      String poolName = r1.getAttributes().getPoolName();
-      assertNotNull(poolName);
-      PoolImpl pool = (PoolImpl)PoolManager.find(poolName);
-      assertNotNull(pool);
-
-      Connection conn = pool.acquireConnection(new ServerLocation(host,PORT1));
-      assertNotNull(conn);
-      assertEquals(PORT1, conn.getServer().getPort());
-      ServerRegionProxy srp = new ServerRegionProxy(Region.SEPARATOR+ REGION_NAME, pool);
-      srp.putOnForTestsOnly(conn, "key1", "server-value1", new EventID(new byte[] {1},159632,1), null);
-      srp.putOnForTestsOnly(conn, "key2", "server-value2", new EventID(new byte[] {1},159632,2), null);
-    }
-    catch (Exception ex) {
-      Assert.fail("while setting acquireConnections", ex);
-    }
+    Region r1 = getCache().getRegion(Region.SEPARATOR + REGION_NAME);
+    r1.put("key1", "server-value1");
+    r1.put("key2", "server-value2");
   }
 
-  public static void killServer(Integer port )
+  public void killServer(Integer port )
   {
-    try {
-      Iterator iter = cache.getCacheServers().iterator();
-      if (iter.hasNext()) {
-        CacheServer server = (CacheServer)iter.next();
-        if(server.getPort() == port.intValue()){
-          server.stop();
-        }
-
+    Iterator iter = getCache().getCacheServers().iterator();
+    if (iter.hasNext()) {
+      CacheServer server = (CacheServer)iter.next();
+      if(server.getPort() == port.intValue()){
+        server.stop();
       }
-    }
-    catch (Exception ex) {
-      fail("while killing Server  " + ex);
+
     }
   }
 
-  public static void startServer(Integer port)
+  public void startServer(Integer port) throws IOException
   {
-    try {
-      CacheServer server1 = cache.addCacheServer();
-      server1.setPort(port.intValue());
-      server1.setNotifyBySubscription(true);
-      server1.start();
-    }
-    catch (Exception ex) {
-      fail("while killServer  " + ex);
-    }
+    CacheServer server1 = getCache().addCacheServer();
+    server1.setPort(port.intValue());
+    server1.setNotifyBySubscription(true);
+    server1.start();
   }
 
   /**
    * Creates entries on the server
    *
    */
-  public static void createEntriesK1andK2()
+  public void createEntriesK1andK2()
   {
-    try {
-      Region r1 = cache.getRegion(Region.SEPARATOR+REGION_NAME);
-      assertNotNull(r1);
-      if (!r1.containsKey("key1")) {
-        r1.put("key1", "key-1");
-      }
-      if (!r1.containsKey("key2")) {
-        r1.put("key2", "key-2");
-      }
-      assertEquals(r1.get("key1"), "key-1");
-      if (r1.getAttributes().getPartitionAttributes() == null) {
-        assertEquals(r1.getEntry("key1").getValue(), "key-1");
-        assertEquals(r1.getEntry("key2").getValue(), "key-2");
-      }
-      else {
-        assertEquals(r1.get("key1"), "key-1");
-        assertEquals(r1.get("key2"), "key-2");
-      }
+    Region r1 = getCache().getRegion(Region.SEPARATOR+REGION_NAME);
+    assertNotNull(r1);
+    if (!r1.containsKey("key1")) {
+      r1.put("key1", "key-1");
     }
-    catch (Exception ex) {
-      Assert.fail("failed while createEntriesK1andK2()", ex);
+    if (!r1.containsKey("key2")) {
+      r1.put("key2", "key-2");
+    }
+    assertEquals(r1.get("key1"), "key-1");
+    if (r1.getAttributes().getPartitionAttributes() == null) {
+      assertEquals(r1.getEntry("key1").getValue(), "key-1");
+      assertEquals(r1.getEntry("key2").getValue(), "key-2");
+    }
+    else {
+      assertEquals(r1.get("key1"), "key-1");
+      assertEquals(r1.get("key2"), "key-2");
     }
   }
 
-  public static void createClientCache(String host, Integer port1 , Integer port2 ) throws Exception
+  public void createClientCache(String host, Integer port1 , Integer port2 ) throws Exception
   {
-    int PORT1 = port1.intValue() ;
-    int PORT2 = port2.intValue();
-    Properties props = new Properties();
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
-    new UpdatePropagationDUnitTest("temp").createCache(props);
-    Pool p;
+    ClientCache cache;
     try {
       System.setProperty("gemfire.PoolImpl.DISABLE_RANDOM", "true");
-      p = PoolManager.createFactory()
-        .addServer(host, PORT1)
-        .addServer(host, PORT2)
-        .setSubscriptionEnabled(true)
-        .setSubscriptionRedundancy(-1)
-        .setMinConnections(4)
-        .setSocketBufferSize(1000)
-        .setReadTimeout(2000)
-        // .setRetryInterval(250)
-        // .setRetryAttempts(2)
-        .create("UpdatePropagationDUnitTestPool");
+      int PORT1 = port1.intValue() ;
+      int PORT2 = port2.intValue();
+      Properties props = new Properties();
+      props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+      props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+      ClientCacheFactory cf = new ClientCacheFactory();
+      cf.addPoolServer(host, PORT1)
+      .addPoolServer(host, PORT2)
+      .setPoolSubscriptionEnabled(true)
+      .setPoolSubscriptionRedundancy(-1)
+      .setPoolMinConnections(4)
+      .setPoolSocketBufferSize(1000)
+      .setPoolReadTimeout(2000)
+      .setPoolPingInterval(300);
+       cache = getClientCache(cf);
     } finally {
       System.setProperty("gemfire.PoolImpl.DISABLE_RANDOM", "false");
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
-    cache.createRegion(REGION_NAME, attrs);
-
+    cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+      .addCacheListener(new EventTrackingCacheListener())
+      .create(REGION_NAME);
   }
 
   public Integer createServerCache() throws Exception
   {
-    new UpdatePropagationDUnitTest("temp").createCache(new Properties());
-    RegionAttributes attrs = createCacheServerAttributes(); 
+    Cache cache = getCache();
+    RegionAttributes attrs = createCacheServerAttributes();
     cache.createRegion(REGION_NAME, attrs);
     CacheServer server = cache.addCacheServer();
     assertNotNull(server);
@@ -505,10 +292,10 @@ public class UpdatePropagationDUnitTest extends DistributedTestCase {
     return factory.create();
   }
 
-  public static void registerKeysK1andK2()
+  public void registerKeysK1andK2()
   {
     try {
-      Region r = cache.getRegion(Region.SEPARATOR+ REGION_NAME);
+      Region r = getCache().getRegion(Region.SEPARATOR+ REGION_NAME);
       assertNotNull(r);
       List list = new ArrayList();
       list.add("key1");
@@ -521,25 +308,22 @@ public class UpdatePropagationDUnitTest extends DistributedTestCase {
     }
   }
 
-  public static void verifyNoUpdates()
+  public void verifySenderUpdateCount()
   {
-    try {
-      Region r = cache.getRegion(Region.SEPARATOR+ REGION_NAME);
-      assertNotNull(r);
-      // verify no updates
-      assertEquals("key-1", r.getEntry("key1").getValue());
-      assertEquals("key-2", r.getEntry("key2").getValue());
-    }
-    catch (Exception ex) {
-      Assert.fail("failed while verifyNoUpdates()", ex);
-    }
+    Region r = getCache().getRegion(Region.SEPARATOR+ REGION_NAME);
+    EventTrackingCacheListener listener = (EventTrackingCacheListener) r.getAttributes().getCacheListeners()[0];
+
+    final List<EntryEvent> events = listener.receivedEvents;
+
+    //We only expect to see 1 create and 1 update from the original put
+    assertEquals("Expected only 2 events for key1", 2, events.stream().filter(event -> event.getKey().equals("key1")).count());
+    assertEquals("Expected only 2 events for key2", 2, events.stream().filter(event -> event.getKey().equals("key2")).count());
   }
 
-  public static void verifyUpdates()
+  public void verifyUpdates()
   {
-    try {
-      Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
-      assertNotNull(r);
+    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+      Region r = getCache().getRegion(Region.SEPARATOR + REGION_NAME);
       // verify updates
       if (r.getAttributes().getPartitionAttributes() == null) {
         assertEquals("server-value2", r.getEntry("key2").getValue());
@@ -549,28 +333,25 @@ public class UpdatePropagationDUnitTest extends DistributedTestCase {
         assertEquals("server-value2", r.get("key2"));
         assertEquals("server-value1", r.get("key1"));
       }
+    });
+  }
+
+  private static class EventTrackingCacheListener extends CacheListenerAdapter {
+    List<EntryEvent> receivedEvents = new ArrayList<>();
+
+    @Override public void afterCreate(final EntryEvent event) {
+      receivedEvents.add(event);
     }
-    catch (Exception ex) {
-      Assert.fail("failed while region", ex);
+
+    @Override public void afterUpdate(final EntryEvent event) {
+      receivedEvents.add(event);
     }
-  }
 
-  public static void closeCache()
-  {
-    if (cache != null && !cache.isClosed()) {
-      cache.close();
-      cache.getDistributedSystem().disconnect();
+    @Override public void afterDestroy(final EntryEvent event) {
+      receivedEvents.add(event);
     }
-  }
 
-  @Override
-  public final void preTearDown() throws Exception {
-    //close client
-    client1.invoke(() -> closeCache());
-    client2.invoke(() -> closeCache());
-    //close server
-    server1.invoke(() -> closeCache());
-    server2.invoke(() -> closeCache());
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/884cf13b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
index 8edac45..589b455 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
@@ -24,9 +24,7 @@ import com.gemstone.gemfire.cache.*;
  */
 public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest {
 
-  public UpdatePropagationPRDUnitTest(String name) {
-    super(name);
-  }
+  @Override
   protected RegionAttributes createCacheServerAttributes()
   {
     AttributesFactory factory = new AttributesFactory();