You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2019/04/10 17:13:43 UTC

[geode] branch develop updated: GEODE-6152: Removed use of futures (optimized get) for proxy region. (#3371)

This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 045bb67  GEODE-6152: Removed use of futures (optimized get) for proxy region. (#3371)
045bb67 is described below

commit 045bb67c7e480bc21a9904c940cc43680a38356b
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed Apr 10 10:13:23 2019 -0700

    GEODE-6152: Removed use of futures (optimized get) for proxy region. (#3371)
    
    * GEODE-6152: Removed use of futures (optimized get) for proxy region.
---
 .../cache/RegionConcurrentOperationDUnitTest.java  | 145 ++++++++++++++
 .../sockets/ClientRegionGetRegressionTest.java     | 212 +++++++++++++++++++++
 .../apache/geode/internal/cache/LocalRegion.java   | 118 +++++++-----
 3 files changed, 431 insertions(+), 44 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
new file mode 100755
index 0000000..d0ccd1d
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PROXY;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class RegionConcurrentOperationDUnitTest implements Serializable {
+
+  private static DUnitBlackboard blackboard;
+
+  Object key = "KEY";
+  String value = "VALUE";
+
+  private static DUnitBlackboard getBlackboard() {
+    if (blackboard == null) {
+      blackboard = new DUnitBlackboard();
+    }
+    return blackboard;
+  }
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  @After
+  public void tearDown() {
+    blackboard.initBlackboard();
+  }
+
+  @Test
+  public void getOnProxyRegionFromMultipleThreadsReturnsDifferentObjects() throws Exception {
+    VM member1 = getVM(0);
+    String regionName = getClass().getSimpleName();
+
+    cacheRule.createCache();
+    cacheRule.getCache().createRegionFactory(REPLICATE_PROXY).create(regionName);
+
+    member1.invoke(() -> {
+      cacheRule.createCache();
+      cacheRule.getCache().createRegionFactory(REPLICATE)
+          .setCacheLoader(new TestCacheLoader()).create(regionName);
+    });
+
+    Future get1 = executorServiceRule.submit(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      return region.get(key);
+    });
+
+    Future get2 = executorServiceRule.submit(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+      return region.get(key);
+    });
+
+    Object get1value = get1.get();
+    Object get2value = get2.get();
+
+    assertThat(get1value).isNotSameAs(get2value);
+  }
+
+  @Test
+  public void getOnPreLoadedRegionFromMultipleThreadsReturnSameObject() throws Exception {
+    VM member1 = getVM(0);
+    String regionName = getClass().getSimpleName();
+
+    cacheRule.createCache();
+    cacheRule.getCache().createRegionFactory().setDataPolicy(DataPolicy.PRELOADED)
+        .setScope(Scope.DISTRIBUTED_ACK).create(regionName);
+
+    member1.invoke(() -> {
+      cacheRule.createCache();
+      cacheRule.getCache().createRegionFactory(REPLICATE)
+          .setCacheLoader(new TestCacheLoader()).create(regionName);
+    });
+    assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(0);
+
+    Future get1 = executorServiceRule.submit(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      return region.get(key);
+    });
+
+    Future get2 = executorServiceRule.submit(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+      return region.get(key);
+    });
+
+    Object get1value = get1.get();
+    Object get2value = get2.get();
+
+    assertThat(get1value).isSameAs(get2value);
+    assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(1);
+  }
+
+  private class TestCacheLoader implements CacheLoader, Serializable {
+
+    @Override
+    public synchronized Object load(LoaderHelper helper) {
+      getBlackboard().signalGate("Loader");
+      return value;
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientRegionGetRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientRegionGetRegressionTest.java
new file mode 100644
index 0000000..ae9fe46
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientRegionGetRegressionTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(ClientServerTest.class)
+public class ClientRegionGetRegressionTest implements Serializable {
+
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+
+  private int port;
+  private VM server;
+  private static DUnitBlackboard blackboard;
+  private String key = "KEY-1";
+
+  private static DUnitBlackboard getBlackboard() {
+    if (blackboard == null) {
+      blackboard = new DUnitBlackboard();
+    }
+    return blackboard;
+  }
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  @Before
+  public void setUp() throws Exception {
+    server = getVM(0);
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+
+    port = server.invoke(() -> createServerCache());
+    createClientCache();
+  }
+
+  @After
+  public void tearDown() {
+    blackboard.initBlackboard();
+  }
+
+  @Test
+  public void getOnProxyRegionFromMultipleThreadsReturnsDifferentObjects() throws Exception {
+
+    ClientRegionFactory<String, String> crf = clientCacheRule.getClientCache()
+        .createClientRegionFactory(ClientRegionShortcut.PROXY);
+    crf.create(regionName);
+
+    Future get1 = executorServiceRule.submit(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      return region.get(key);
+    });
+
+    Future get2 = executorServiceRule.submit(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+      return region.get(key);
+    });
+
+    Object get1value = get1.get();
+    Object get2value = get2.get();
+
+    assertThat(get1value).isNotSameAs(get2value);
+  }
+
+  @Test
+  public void getOnCachingProxyRegionFromMultipleThreadsReturnSameObject() throws Exception {
+
+    ClientRegionFactory<String, String> crf = clientCacheRule.getClientCache()
+        .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+
+    crf.create(regionName);
+    Future get1 = executorServiceRule.submit(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      return region.get(key);
+    });
+
+    Future get2 = executorServiceRule.submit(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+      return region.get(key);
+    });
+
+    Object get1value = get1.get();
+    Object get2value = get2.get();
+
+    assertThat(get1value).isSameAs(get2value);
+  }
+
+  @Test
+  public void getOnCachingProxyRegionWithCopyOnReadFromMultipleThreadsReturnsDifferentObject()
+      throws Exception {
+
+    ClientCache cache = clientCacheRule.getClientCache();
+    cache.setCopyOnRead(true);
+
+    ClientRegionFactory<String, String> crf =
+        cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
+
+    crf.create(regionName);
+    Future get1 = executorServiceRule.submit(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      return region.get(key);
+    });
+
+    Future get2 = executorServiceRule.submit(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS);
+      return region.get(key);
+    });
+
+    Object get1value = get1.get();
+    Object get2value = get2.get();
+
+    assertThat(get1value).isNotSameAs(get2value);
+  }
+
+  private void createClientCache() {
+    ClientCacheFactory clientCacheFactory = new ClientCacheFactory().addPoolServer(hostName, port);
+    clientCacheRule.createClientCache(clientCacheFactory);
+  }
+
+  private int createServerCache() throws IOException {
+    cacheRule.createCache();
+
+    RegionFactory<?, ?> regionFactory = cacheRule.getCache().createRegionFactory(REPLICATE);
+    regionFactory.setCacheLoader(new TestCacheLoader()).create(regionName);
+
+    CacheServer cacheServer = cacheRule.getCache().addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+    return cacheServer.getPort();
+  }
+
+  public class TestObject implements Serializable {
+    int id;
+
+    TestObject(int id) {
+      this.id = id;
+    }
+  }
+
+  private class TestCacheLoader implements CacheLoader, Serializable {
+    @Override
+    public synchronized Object load(LoaderHelper helper) {
+      getBlackboard().signalGate("Loader");
+      return new TestObject(1);
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index df5a25f..2dae4a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -1467,8 +1467,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   /**
-   * optimized to only allow one thread to do a search/load, other threads wait on a future
-   *
    * @param isCreate true if call found no entry; false if updating an existing entry
    * @param localValue the value retrieved from the region for this object.
    * @param disableCopyOnRead if true then do not make a copy
@@ -1481,8 +1479,67 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       Object localValue, boolean disableCopyOnRead, boolean preferCD,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
       boolean returnTombstones) throws TimeoutException, CacheLoaderException {
-
     @Retained
+    Object result;
+    if (isProxy()) {
+      result =
+          getObject(keyInfo, isCreate, generateCallbacks, localValue, disableCopyOnRead, preferCD,
+              requestingClient, clientEvent, returnTombstones);
+    } else {
+      result = optimizedGetObject(keyInfo, isCreate, generateCallbacks, localValue,
+          disableCopyOnRead, preferCD,
+          requestingClient, clientEvent, returnTombstones);
+    }
+    return result;
+  }
+
+
+  private Object getObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
+      Object localValue, boolean disableCopyOnRead, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+      boolean returnTombstones) {
+    Object result;
+    boolean partitioned = getDataPolicy().withPartitioning();
+    if (!partitioned) {
+      localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD,
+          clientEvent, false, false);
+
+      // stats have now been updated
+      if (localValue != null && !Token.isInvalid(localValue)) {
+        result = localValue;
+        return result;
+      }
+      isCreate = localValue == null;
+      result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, localValue,
+          disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+
+    } else {
+      // For PRs we don't want to deserialize the value and we can't use findObjectInSystem
+      // because it can invoke code that is transactional.
+      result =
+          getSharedDataView().findObject(keyInfo, this, isCreate, generateCallbacks, localValue,
+              disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+    }
+
+    if (result == null && localValue != null) {
+      if (localValue != Token.TOMBSTONE || returnTombstones) {
+        result = localValue;
+      }
+    }
+    // findObjectInSystem does not call conditionalCopy
+    if (!disableCopyOnRead) {
+      result = conditionalCopy(result);
+    }
+    return result;
+  }
+
+  /**
+   * optimized to only allow one thread to do a search/load, other threads wait on a future
+   */
+  private Object optimizedGetObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks,
+      Object localValue, boolean disableCopyOnRead, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+      boolean returnTombstones) {
     Object result = null;
     FutureResult thisFuture = new FutureResult(stopper);
     Future otherFuture = (Future) getFutures.putIfAbsent(keyInfo.getKey(), thisFuture);
@@ -1498,7 +1555,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
           if (!preferCD && result instanceof CachedDeserializable) {
             CachedDeserializable cd = (CachedDeserializable) result;
             // fix for bug 43023
-            if (!disableCopyOnRead && isCopyOnRead()) {
+            if (!disableCopyOnRead && (isCopyOnRead() || isProxy())) {
               result = cd.getDeserializedWritableCopy(null, null);
             } else {
               result = cd.getDeserializedForReading();
@@ -1514,7 +1571,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
           }
           return result;
         }
-        // if value == null, try our own search/load
       } catch (InterruptedException ignore) {
         Thread.currentThread().interrupt();
         // TODO check a CancelCriterion here?
@@ -1526,53 +1582,27 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
             "unexpected exception", e);
       }
     }
-    // didn't find a future, do one more probe for the entry to catch a race
-    // condition where the future was just removed by another thread
     try {
-      boolean partitioned = getDataPolicy().withPartitioning();
-      if (!partitioned) {
-        localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD,
-            clientEvent, false, false);
-
-        // stats have now been updated
-        if (localValue != null && !Token.isInvalid(localValue)) {
-          result = localValue;
-          return result;
-        }
-        isCreate = localValue == null;
-        result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, localValue,
-            disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
-
-      } else {
+      result =
+          getObject(keyInfo, isCreate, generateCallbacks, localValue, disableCopyOnRead, preferCD,
+              requestingClient, clientEvent, returnTombstones);
 
-        // For PRs we don't want to deserialize the value and we can't use findObjectInSystem
-        // because it can invoke code that is transactional.
-        result =
-            getSharedDataView().findObject(keyInfo, this, isCreate, generateCallbacks, localValue,
-                disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
-      }
-
-      if (result == null && localValue != null) {
-        if (localValue != Token.TOMBSTONE || returnTombstones) {
-          result = localValue;
-        }
-      }
-      // findObjectInSystem does not call conditionalCopy
     } finally {
-      if (result != null) {
-        VersionTag tag = clientEvent == null ? null : clientEvent.getVersionTag();
-        thisFuture.set(new Object[] {result, tag});
-      } else {
-        thisFuture.set(null);
+      if (otherFuture == null) {
+        if (result != null) {
+          VersionTag tag = clientEvent == null ? null : clientEvent.getVersionTag();
+          thisFuture.set(new Object[] {result, tag});
+        } else {
+          thisFuture.set(null);
+        }
+        getFutures.remove(keyInfo.getKey());
       }
-      getFutures.remove(keyInfo.getKey());
-    }
-    if (!disableCopyOnRead) {
-      result = conditionalCopy(result);
     }
+
     return result;
   }
 
+
   /**
    * Returns true if get should give a copy; false if a reference.
    *