You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/08/31 01:26:09 UTC
[32/47] geode git commit: GEODE-3519 servers are not locking on some
ops initiated by clients
GEODE-3519 servers are not locking on some ops initiated by clients
While investigating dlocktoken cleanup I discovered that a number of
operations coming from a client were not locking entries for Scope.GLOBAL
regions on servers. Only put, putIfAbsent and variants of replace were
obtaining locks.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/2637bd87
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/2637bd87
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/2637bd87
Branch: refs/heads/feature/GEODE-3543
Commit: 2637bd8794b76e048f2fb13007ad0a50842059d8
Parents: 4ac4600
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Aug 29 14:07:52 2017 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Aug 29 14:08:53 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/DistributedRegion.java | 49 +++++++
.../geode/cache30/ClientServerCCEDUnitTest.java | 129 +++++++++++++++++--
2 files changed, 170 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/2637bd87/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d9cf1ed..e882ed1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -48,6 +48,7 @@ import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.cache.EntryExistsException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.LossAction;
import org.apache.geode.cache.MembershipAttributes;
@@ -1500,6 +1501,54 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
}
@Override
+ public void basicBridgeRemove(Object key, Object expectedOldValue, Object p_callbackArg,
+ ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent)
+ throws TimeoutException, EntryNotFoundException, CacheWriterException {
+ Lock lock = getDistributedLockIfGlobal(key);
+ try {
+ super.basicBridgeRemove(key, expectedOldValue, p_callbackArg, memberId, fromClient,
+ clientEvent);
+ } finally {
+ if (lock != null) {
+ logger.debug("releasing distributed lock on {}", key);
+ lock.unlock();
+ getLockService().freeResources(key);
+ }
+ }
+ }
+
+ @Override
+ public void basicBridgeDestroy(Object key, Object p_callbackArg, ClientProxyMembershipID memberId,
+ boolean fromClient, EntryEventImpl clientEvent)
+ throws TimeoutException, EntryNotFoundException, CacheWriterException {
+ Lock lock = getDistributedLockIfGlobal(key);
+ try {
+ super.basicBridgeDestroy(key, p_callbackArg, memberId, fromClient, clientEvent);
+ } finally {
+ if (lock != null) {
+ logger.debug("releasing distributed lock on {}", key);
+ lock.unlock();
+ getLockService().freeResources(key);
+ }
+ }
+ }
+
+ @Override
+ public void basicBridgeInvalidate(Object key, Object p_callbackArg,
+ ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent)
+ throws TimeoutException, EntryNotFoundException, CacheWriterException {
+ Lock lock = getDistributedLockIfGlobal(key);
+ try {
+ super.basicBridgeInvalidate(key, p_callbackArg, memberId, fromClient, clientEvent);
+ } finally {
+ if (lock != null) {
+ logger.debug("releasing distributed lock on {}", key);
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
protected void basicDestroy(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
throws EntryNotFoundException, CacheWriterException, TimeoutException {
// disallow local destruction for mirrored keysvalues regions
http://git-wip-us.apache.org/repos/asf/geode/blob/2637bd87/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
index b4224e0..b9301dd 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.cache30;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.*;
import static org.junit.Assert.*;
@@ -27,23 +28,30 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.ExpirationAction;
import org.apache.geode.cache.ExpirationAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.locks.DLockService;
+import org.apache.geode.distributed.internal.locks.DistributedLockStats;
+import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
@@ -98,6 +106,106 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
}
+ /**
+ * GEODE-3519 servers are not locking on remove or invalidate ops initiated by clients
+ * <p>
+ * This test sets up two servers each with a client attached. The clients perform operations on
+ * the same key in a region which, in the servers, has Scope.GLOBAL. There should be no conflation
+ * and each operation should obtain a lock.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testClientEventsAreNotConflatedByGlobalRegionOnServer() throws Exception {
+ VM[] serverVMs = new VM[] {Host.getHost(0).getVM(0), Host.getHost(0).getVM(1)};
+ VM[] clientVMs = new VM[] {Host.getHost(0).getVM(2), Host.getHost(0).getVM(3)};
+ final String name = this.getUniqueName() + "Region";
+
+ int serverPorts[] = new int[] {createServerRegion(serverVMs[0], name, true, Scope.GLOBAL),
+ createServerRegion(serverVMs[1], name, true, Scope.GLOBAL)};
+
+ for (int i = 0; i < clientVMs.length; i++) {
+ createClientRegion(clientVMs[i], name, serverPorts[i], false,
+ ClientRegionShortcut.CACHING_PROXY, false);
+ }
+
+ getBlackboard().initBlackboard();
+
+ final int numIterations = 500;
+
+ AsyncInvocation[] asyncInvocations = new AsyncInvocation[clientVMs.length];
+ for (int i = 0; i < clientVMs.length; i++) {
+ final String clientGateName = "client" + i + "Ready";
+ asyncInvocations[i] = clientVMs[i].invokeAsync("doOps Thread", () -> {
+ doOps(name, numIterations, clientGateName);
+ });
+ getBlackboard().waitForGate(clientGateName, 30, SECONDS);
+ }
+
+ getBlackboard().signalGate("proceed");
+
+ for (int i = 0; i < asyncInvocations.length; i++) {
+ asyncInvocations[i].join();
+ }
+
+ for (int i = 0; i < serverVMs.length; i++) {
+ serverVMs[i].invoke("verify thread", () -> {
+ verifyServerState(name, numIterations);
+ });
+ }
+ }
+
+ private void verifyServerState(String name, int numIterations) {
+ Cache cache = CacheFactory.getAnyInstance();
+ DistributedRegion region = (DistributedRegion) cache.getRegion(name);
+ CachePerfStats stats = region.getCachePerfStats();
+ assertEquals(0, stats.getConflatedEventsCount());
+
+ DLockService dLockService = (DLockService) region.getLockService();
+ DistributedLockStats distributedLockStats = dLockService.getStats();
+ assertEquals(numIterations, distributedLockStats.getLockReleasesCompleted());
+ }
+
+ private void doOps(String name, int numIterations, String clientGateName) {
+ ClientCache cache = ClientCacheFactory.getAnyInstance();
+ Region region = cache.getRegion(name);
+ getBlackboard().signalGate(clientGateName);
+ try {
+ getBlackboard().waitForGate("proceed", 30, SECONDS);
+ } catch (TimeoutException | InterruptedException e) {
+ throw new RuntimeException("failed to start", e);
+ }
+ String key = "lockingKey";
+ String value = "lockingValue";
+ for (int j = 0; j < numIterations; j++) {
+ int operation = j % 5;
+ switch (operation) {
+ case 0:
+ region.remove(key);
+ break;
+ case 1:
+ region.putIfAbsent(key, value);
+ break;
+ case 2:
+ region.invalidate(key);
+ break;
+ case 3:
+ region.replace(key, value);
+ break;
+ case 4:
+ region.replace(key, value, value);
+ break;
+// case 5:
+// remove(k,v) can't be included in this test as it checks the old value
+// against what is in the local cache before sending the operation to the server
+// region.remove(key, value);
+// break;
+ default:
+ throw new RuntimeException("" + j + " % 5 == " + operation + "?");
+ }
+ }
+ }
+
@Test
public void testClientDoesNotExpireEntryPrematurely() throws Exception {
Host host = Host.getHost(0);
@@ -144,8 +252,8 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
final long expirationTime = System.currentTimeMillis() + (expirationSeconds * 1000);
- Awaitility.await("waiting for object to expire")
- .atMost(expirationSeconds * 2, TimeUnit.SECONDS).until(() -> {
+ Awaitility.await("waiting for object to expire").atMost(expirationSeconds * 2, SECONDS)
+ .until(() -> {
return expirationTimeMillis[0] != null;
});
@@ -246,7 +354,7 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
vm1.invoke(() -> {
PRTombstoneMessageObserver mo =
(PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+ Awaitility.await().atMost(60, SECONDS).until(() -> {
return mo.tsMessageProcessed >= 1;
});
assertTrue("Tombstone GC message is not expected.", mo.thName.contains(
@@ -297,7 +405,7 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
PRTombstoneMessageObserver mo =
(PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
// Should receive tombstone message for each bucket.
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+ Awaitility.await().atMost(60, SECONDS).until(() -> {
return mo.prTsMessageProcessed >= 2;
});
assertEquals("Tombstone GC message is expected.", 2, mo.prTsMessageProcessed);
@@ -818,12 +926,17 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion) {
+ return createServerRegion(vm, regionName, replicatedRegion, Scope.DISTRIBUTED_ACK);
+ }
+
+ private int createServerRegion(VM vm, final String regionName, final boolean replicatedRegion,
+ Scope regionScope) {
SerializableCallable createRegion = new SerializableCallable() {
public Object call() throws Exception {
// TombstoneService.VERBOSE = true;
AttributesFactory af = new AttributesFactory();
if (replicatedRegion) {
- af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setScope(regionScope);
af.setDataPolicy(DataPolicy.REPLICATE);
} else {
af.setDataPolicy(DataPolicy.PARTITION);