You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2019/04/10 17:13:43 UTC
[geode] branch develop updated: GEODE-6152: Removed use of futures
(optimized get) for proxy region. (#3371)
This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 045bb67 GEODE-6152: Removed use of futures (optimized get) for proxy region. (#3371)
045bb67 is described below
commit 045bb67c7e480bc21a9904c940cc43680a38356b
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed Apr 10 10:13:23 2019 -0700
GEODE-6152: Removed use of futures (optimized get) for proxy region. (#3371)
* GEODE-6152: Removed use of futures (optimized get) for proxy region.
---
.../cache/RegionConcurrentOperationDUnitTest.java | 145 ++++++++++++++
.../sockets/ClientRegionGetRegressionTest.java | 212 +++++++++++++++++++++
.../apache/geode/internal/cache/LocalRegion.java | 118 +++++++-----
3 files changed, 431 insertions(+), 44 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
new file mode 100755
index 0000000..d0ccd1d
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PROXY;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class RegionConcurrentOperationDUnitTest implements Serializable {
+
+ private static DUnitBlackboard blackboard;
+
+ Object key = "KEY";
+ String value = "VALUE";
+
+ private static DUnitBlackboard getBlackboard() {
+ if (blackboard == null) {
+ blackboard = new DUnitBlackboard();
+ }
+ return blackboard;
+ }
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @After
+ public void tearDown() {
+ blackboard.initBlackboard();
+ }
+
+ @Test
+ public void getOnProxyRegionFromMultipleThreadsReturnsDifferentObjects() throws Exception {
+ VM member1 = getVM(0);
+ String regionName = getClass().getSimpleName();
+
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(REPLICATE_PROXY).create(regionName);
+
+ member1.invoke(() -> {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(REPLICATE)
+ .setCacheLoader(new TestCacheLoader()).create(regionName);
+ });
+
+ Future get1 = executorServiceRule.submit(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ return region.get(key);
+ });
+
+ Future get2 = executorServiceRule.submit(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+ return region.get(key);
+ });
+
+ Object get1value = get1.get();
+ Object get2value = get2.get();
+
+ assertThat(get1value).isNotSameAs(get2value);
+ }
+
+ @Test
+ public void getOnPreLoadedRegionFromMultipleThreadsReturnSameObject() throws Exception {
+ VM member1 = getVM(0);
+ String regionName = getClass().getSimpleName();
+
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory().setDataPolicy(DataPolicy.PRELOADED)
+ .setScope(Scope.DISTRIBUTED_ACK).create(regionName);
+
+ member1.invoke(() -> {
+ cacheRule.createCache();
+ cacheRule.getCache().createRegionFactory(REPLICATE)
+ .setCacheLoader(new TestCacheLoader()).create(regionName);
+ });
+ assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(0);
+
+ Future get1 = executorServiceRule.submit(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ return region.get(key);
+ });
+
+ Future get2 = executorServiceRule.submit(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+ return region.get(key);
+ });
+
+ Object get1value = get1.get();
+ Object get2value = get2.get();
+
+ assertThat(get1value).isSameAs(get2value);
+ assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(1);
+ }
+
+ private class TestCacheLoader implements CacheLoader, Serializable {
+
+ @Override
+ public synchronized Object load(LoaderHelper helper) {
+ getBlackboard().signalGate("Loader");
+ return value;
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientRegionGetRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientRegionGetRegressionTest.java
new file mode 100644
index 0000000..ae9fe46
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientRegionGetRegressionTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(ClientServerTest.class)
+public class ClientRegionGetRegressionTest implements Serializable {
+
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+
+ private int port;
+ private VM server;
+ private static DUnitBlackboard blackboard;
+ private String key = "KEY-1";
+
+ private static DUnitBlackboard getBlackboard() {
+ if (blackboard == null) {
+ blackboard = new DUnitBlackboard();
+ }
+ return blackboard;
+ }
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @Before
+ public void setUp() throws Exception {
+ server = getVM(0);
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+
+ port = server.invoke(() -> createServerCache());
+ createClientCache();
+ }
+
+ @After
+ public void tearDown() {
+ blackboard.initBlackboard();
+ }
+
+ @Test
+ public void getOnProxyRegionFromMultipleThreadsReturnsDifferentObjects() throws Exception {
+
+ ClientRegionFactory<String, String> crf = clientCacheRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.PROXY);
+ crf.create(regionName);
+
+ Future get1 = executorServiceRule.submit(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ return region.get(key);
+ });
+
+ Future get2 = executorServiceRule.submit(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+ return region.get(key);
+ });
+
+ Object get1value = get1.get();
+ Object get2value = get2.get();
+
+ assertThat(get1value).isNotSameAs(get2value);
+ }
+
+ @Test
+ public void getOnCachingProxyRegionFromMultipleThreadsReturnSameObject() throws Exception {
+
+ ClientRegionFactory<String, String> crf = clientCacheRule.getClientCache()
+ .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+
+ crf.create(regionName);
+ Future get1 = executorServiceRule.submit(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ return region.get(key);
+ });
+
+ Future get2 = executorServiceRule.submit(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+ return region.get(key);
+ });
+
+ Object get1value = get1.get();
+ Object get2value = get2.get();
+
+ assertThat(get1value).isSameAs(get2value);
+ }
+
+ @Test
+ public void getOnCachingProxyRegionWithCopyOnReadFromMultipleThreadsReturnsDifferentObject()
+ throws Exception {
+
+ ClientCache cache = clientCacheRule.getClientCache();
+ cache.setCopyOnRead(true);
+
+ ClientRegionFactory<String, String> crf =
+ cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+
+ crf.create(regionName);
+ Future get1 = executorServiceRule.submit(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ return region.get(key);
+ });
+
+ Future get2 = executorServiceRule.submit(() -> {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+ return region.get(key);
+ });
+
+ Object get1value = get1.get();
+ Object get2value = get2.get();
+
+ assertThat(get1value).isNotSameAs(get2value);
+ }
+
+ private void createClientCache() {
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory().addPoolServer(hostName, port);
+ clientCacheRule.createClientCache(clientCacheFactory);
+ }
+
+ private int createServerCache() throws IOException {
+ cacheRule.createCache();
+
+ RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+ regionFactory.setCacheLoader(new TestCacheLoader()).create(regionName);
+
+ CacheServer cacheServer = cacheRule.getCache().addCacheServer();
+ cacheServer.setPort(0);
+ cacheServer.start();
+ return cacheServer.getPort();
+ }
+
+ public class TestObject implements Serializable {
+ int id;
+
+ TestObject(int id) {
+ this.id = id;
+ }
+ }
+
+ private class TestCacheLoader implements CacheLoader, Serializable {
+ @Override
+ public synchronized Object load(LoaderHelper helper) {
+ getBlackboard().signalGate("Loader");
+ return new TestObject(1);
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index df5a25f..2dae4a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1467,8 +1467,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
/**
- * optimized to only allow one thread to do a search/load, other threads wait on a future
- *
* @param isCreate true if call found no entry; false if updating an existing entry
* @param localValue the value retrieved from the region for this object.
* @param disableCopyOnRead if true then do not make a copy
@@ -1481,8 +1479,67 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
Object localValue, boolean disableCopyOnRead, boolean preferCD,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) throws TimeoutException, CacheLoaderException {
-
@Retained
+ Object result;
+ if (isProxy()) {
+ result =
+ getObject(keyInfo, isCreate, generateCallbacks, localValue, disableCopyOnRead, preferCD,
+ requestingClient, clientEvent, returnTombstones);
+ } else {
+ result = optimizedGetObject(keyInfo, isCreate, generateCallbacks, localValue,
+ disableCopyOnRead, preferCD,
+ requestingClient, clientEvent, returnTombstones);
+ }
+ return result;
+ }
+
+
+ private Object getObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
+ Object localValue, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+ boolean returnTombstones) {
+ Object result;
+ boolean partitioned = getDataPolicy().withPartitioning();
+ if (!partitioned) {
+ localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD,
+ clientEvent, false, false);
+
+ // stats have now been updated
+ if (localValue != null && !Token.isInvalid(localValue)) {
+ result = localValue;
+ return result;
+ }
+ isCreate = localValue == null;
+ result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, localValue,
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+
+ } else {
+ // For PRs we don't want to deserialize the value and we can't use findObjectInSystem
+ // because it can invoke code that is transactional.
+ result =
+ getSharedDataView().findObject(keyInfo, this, isCreate, generateCallbacks, localValue,
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+ }
+
+ if (result == null && localValue != null) {
+ if (localValue != Token.TOMBSTONE || returnTombstones) {
+ result = localValue;
+ }
+ }
+ // findObjectInSystem does not call conditionalCopy
+ if (!disableCopyOnRead) {
+ result = conditionalCopy(result);
+ }
+ return result;
+ }
+
+ /**
+ * optimized to only allow one thread to do a search/load, other threads wait on a future
+ */
+ private Object optimizedGetObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
+ Object localValue, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+ boolean returnTombstones) {
Object result = null;
FutureResult thisFuture = new FutureResult(stopper);
Future otherFuture = (Future) getFutures.putIfAbsent(keyInfo.getKey(), thisFuture);
@@ -1498,7 +1555,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
if (!preferCD && result instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable) result;
// fix for bug 43023
- if (!disableCopyOnRead && isCopyOnRead()) {
+ if (!disableCopyOnRead && (isCopyOnRead() || isProxy())) {
result = cd.getDeserializedWritableCopy(null, null);
} else {
result = cd.getDeserializedForReading();
@@ -1514,7 +1571,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
return result;
}
- // if value == null, try our own search/load
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
// TODO check a CancelCriterion here?
@@ -1526,53 +1582,27 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
"unexpected exception", e);
}
}
- // didn't find a future, do one more probe for the entry to catch a race
- // condition where the future was just removed by another thread
try {
- boolean partitioned = getDataPolicy().withPartitioning();
- if (!partitioned) {
- localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD,
- clientEvent, false, false);
-
- // stats have now been updated
- if (localValue != null && !Token.isInvalid(localValue)) {
- result = localValue;
- return result;
- }
- isCreate = localValue == null;
- result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, localValue,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
-
- } else {
+ result =
+ getObject(keyInfo, isCreate, generateCallbacks, localValue, disableCopyOnRead, preferCD,
+ requestingClient, clientEvent, returnTombstones);
- // For PRs we don't want to deserialize the value and we can't use findObjectInSystem
- // because it can invoke code that is transactional.
- result =
- getSharedDataView().findObject(keyInfo, this, isCreate, generateCallbacks, localValue,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
- }
-
- if (result == null && localValue != null) {
- if (localValue != Token.TOMBSTONE || returnTombstones) {
- result = localValue;
- }
- }
- // findObjectInSystem does not call conditionalCopy
} finally {
- if (result != null) {
- VersionTag tag = clientEvent == null ? null : clientEvent.getVersionTag();
- thisFuture.set(new Object[] {result, tag});
- } else {
- thisFuture.set(null);
+ if (otherFuture == null) {
+ if (result != null) {
+ VersionTag tag = clientEvent == null ? null : clientEvent.getVersionTag();
+ thisFuture.set(new Object[] {result, tag});
+ } else {
+ thisFuture.set(null);
+ }
+ getFutures.remove(keyInfo.getKey());
}
- getFutures.remove(keyInfo.getKey());
- }
- if (!disableCopyOnRead) {
- result = conditionalCopy(result);
}
+
return result;
}
+
/**
* Returns true if get should give a copy; false if a reference.
*