You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/08/31 01:26:09 UTC

[32/47] geode git commit: GEODE-3519 servers are not locking on some ops initiated by clients

GEODE-3519 servers are not locking on some ops initiated by clients

While investigating dlocktoken cleanup I discovered that a number of
operations coming from a client were not locking entries for Scope.GLOBAL
regions on servers.  Only put, putIfAbsent and variants of replace were
obtaining locks.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/2637bd87
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/2637bd87
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/2637bd87

Branch: refs/heads/feature/GEODE-3543
Commit: 2637bd8794b76e048f2fb13007ad0a50842059d8
Parents: 4ac4600
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Aug 29 14:07:52 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Aug 29 14:08:53 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/DistributedRegion.java |  49 +++++++
 .../geode/cache30/ClientServerCCEDUnitTest.java | 129 +++++++++++++++++--
 2 files changed, 170 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/2637bd87/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d9cf1ed..e882ed1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -48,6 +48,7 @@ import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.EntryExistsException;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.LossAction;
 import org.apache.geode.cache.MembershipAttributes;
@@ -1500,6 +1501,54 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
   }
 
   @Override
+  public void basicBridgeRemove(Object key, Object expectedOldValue, Object p_callbackArg,
+      ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent)
+      throws TimeoutException, EntryNotFoundException, CacheWriterException {
+    Lock lock = getDistributedLockIfGlobal(key);
+    try {
+      super.basicBridgeRemove(key, expectedOldValue, p_callbackArg, memberId, fromClient,
+          clientEvent);
+    } finally {
+      if (lock != null) {
+        logger.debug("releasing distributed lock on {}", key);
+        lock.unlock();
+        getLockService().freeResources(key);
+      }
+    }
+  }
+
+  @Override
+  public void basicBridgeDestroy(Object key, Object p_callbackArg, ClientProxyMembershipID memberId,
+      boolean fromClient, EntryEventImpl clientEvent)
+      throws TimeoutException, EntryNotFoundException, CacheWriterException {
+    Lock lock = getDistributedLockIfGlobal(key);
+    try {
+      super.basicBridgeDestroy(key, p_callbackArg, memberId, fromClient, clientEvent);
+    } finally {
+      if (lock != null) {
+        logger.debug("releasing distributed lock on {}", key);
+        lock.unlock();
+        getLockService().freeResources(key);
+      }
+    }
+  }
+
+  @Override
+  public void basicBridgeInvalidate(Object key, Object p_callbackArg,
+      ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent)
+      throws TimeoutException, EntryNotFoundException, CacheWriterException {
+    Lock lock = getDistributedLockIfGlobal(key);
+    try {
+      super.basicBridgeInvalidate(key, p_callbackArg, memberId, fromClient, clientEvent);
+    } finally {
+      if (lock != null) {
+        logger.debug("releasing distributed lock on {}", key);
+        lock.unlock();
+      }
+    }
+  }
+
+  @Override
   protected void basicDestroy(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
       throws EntryNotFoundException, CacheWriterException, TimeoutException {
     // disallow local destruction for mirrored keysvalues regions

http://git-wip-us.apache.org/repos/asf/geode/blob/2637bd87/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
index b4224e0..b9301dd 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.cache30;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.distributed.ConfigurationProperties.*;
 import static org.junit.Assert.*;
 
@@ -27,23 +28,30 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.ExpirationAction;
 import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.locks.DLockService;
+import org.apache.geode.distributed.internal.locks.DistributedLockStats;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionFactory;
@@ -98,6 +106,106 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
   }
 
 
+  /**
+   * GEODE-3519 servers are not locking on remove or invalidate ops initiated by clients
+   * <p>
+   * This test sets up two servers each with a client attached. The clients perform operations on
+   * the same key in a region which, in the servers, has Scope.GLOBAL. There should be no conflation
+   * and each operation should obtain a lock.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testClientEventsAreNotConflatedByGlobalRegionOnServer() throws Exception {
+    VM[] serverVMs = new VM[] {Host.getHost(0).getVM(0), Host.getHost(0).getVM(1)};
+    VM[] clientVMs = new VM[] {Host.getHost(0).getVM(2), Host.getHost(0).getVM(3)};
+    final String name = this.getUniqueName() + "Region";
+
+    int serverPorts[] = new int[] {createServerRegion(serverVMs[0], name, true, Scope.GLOBAL),
+        createServerRegion(serverVMs[1], name, true, Scope.GLOBAL)};
+
+    for (int i = 0; i < clientVMs.length; i++) {
+      createClientRegion(clientVMs[i], name, serverPorts[i], false,
+          ClientRegionShortcut.CACHING_PROXY, false);
+    }
+
+    getBlackboard().initBlackboard();
+
+    final int numIterations = 500;
+
+    AsyncInvocation[] asyncInvocations = new AsyncInvocation[clientVMs.length];
+    for (int i = 0; i < clientVMs.length; i++) {
+      final String clientGateName = "client" + i + "Ready";
+      asyncInvocations[i] = clientVMs[i].invokeAsync("doOps Thread", () -> {
+        doOps(name, numIterations, clientGateName);
+      });
+      getBlackboard().waitForGate(clientGateName, 30, SECONDS);
+    }
+
+    getBlackboard().signalGate("proceed");
+
+    for (int i = 0; i < asyncInvocations.length; i++) {
+      asyncInvocations[i].join();
+    }
+
+    for (int i = 0; i < serverVMs.length; i++) {
+      serverVMs[i].invoke("verify thread", () -> {
+        verifyServerState(name, numIterations);
+      });
+    }
+  }
+
+  private void verifyServerState(String name, int numIterations) {
+    Cache cache = CacheFactory.getAnyInstance();
+    DistributedRegion region = (DistributedRegion) cache.getRegion(name);
+    CachePerfStats stats = region.getCachePerfStats();
+    assertEquals(0, stats.getConflatedEventsCount());
+
+    DLockService dLockService = (DLockService) region.getLockService();
+    DistributedLockStats distributedLockStats = dLockService.getStats();
+    assertEquals(numIterations, distributedLockStats.getLockReleasesCompleted());
+  }
+
+  private void doOps(String name, int numIterations, String clientGateName) {
+    ClientCache cache = ClientCacheFactory.getAnyInstance();
+    Region region = cache.getRegion(name);
+    getBlackboard().signalGate(clientGateName);
+    try {
+      getBlackboard().waitForGate("proceed", 30, SECONDS);
+    } catch (TimeoutException | InterruptedException e) {
+      throw new RuntimeException("failed to start", e);
+    }
+    String key = "lockingKey";
+    String value = "lockingValue";
+    for (int j = 0; j < numIterations; j++) {
+      int operation = j % 5;
+      switch (operation) {
+        case 0:
+          region.remove(key);
+          break;
+        case 1:
+          region.putIfAbsent(key, value);
+          break;
+        case 2:
+          region.invalidate(key);
+          break;
+        case 3:
+          region.replace(key, value);
+          break;
+        case 4:
+          region.replace(key, value, value);
+          break;
+//        case 5:
+//          remove(k,v) can't be included in this test as it checks the old value
+//          against what is in the local cache before sending the operation to the server
+//          region.remove(key, value);
+//          break;
+        default:
+          throw new RuntimeException("" + j + " % 5 == " + operation + "?");
+      }
+    }
+  }
+
   @Test
   public void testClientDoesNotExpireEntryPrematurely() throws Exception {
     Host host = Host.getHost(0);
@@ -144,8 +252,8 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
 
         final long expirationTime = System.currentTimeMillis() + (expirationSeconds * 1000);
 
-        Awaitility.await("waiting for object to expire")
-            .atMost(expirationSeconds * 2, TimeUnit.SECONDS).until(() -> {
+        Awaitility.await("waiting for object to expire").atMost(expirationSeconds * 2, SECONDS)
+            .until(() -> {
               return expirationTimeMillis[0] != null;
             });
 
@@ -246,7 +354,7 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
       vm1.invoke(() -> {
         PRTombstoneMessageObserver mo =
             (PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
-        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+        Awaitility.await().atMost(60, SECONDS).until(() -> {
           return mo.tsMessageProcessed >= 1;
         });
         assertTrue("Tombstone GC message is not expected.", mo.thName.contains(
@@ -297,7 +405,7 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
         PRTombstoneMessageObserver mo =
             (PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
         // Should receive tombstone message for each bucket.
-        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+        Awaitility.await().atMost(60, SECONDS).until(() -> {
           return mo.prTsMessageProcessed >= 2;
         });
         assertEquals("Tombstone GC message is expected.", 2, mo.prTsMessageProcessed);
@@ -818,12 +926,17 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
 
 
   private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion) {
+    return createServerRegion(vm, regionName, replicatedRegion, Scope.DISTRIBUTED_ACK);
+  }
+
+  private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion,
+                                 Scope regionScope) {
     SerializableCallable createRegion = new SerializableCallable() {
       public Object call() throws Exception {
         // TombstoneService.VERBOSE = true;
         AttributesFactory af = new AttributesFactory();
         if (replicatedRegion) {
-          af.setScope(Scope.DISTRIBUTED_ACK);
+          af.setScope(regionScope);
           af.setDataPolicy(DataPolicy.REPLICATE);
         } else {
           af.setDataPolicy(DataPolicy.PARTITION);