You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:29:05 UTC
[geode] 19/19: GEODE-7672: add dunit test to verify OQL index after
PR clear. (#5436)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit c9e4a62f8da2e052c4e3e4dc1e467e00f6b3a4f3
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Tue Sep 8 12:19:52 2020 -0700
GEODE-7672: add dunit test to verify OQL index after PR clear. (#5436)
* require rvv lock when create index
---
.../partitioned/PRClearQueryIndexDUnitTest.java | 376 +++++++++++++++++++++
.../cache/query/internal/DefaultQueryService.java | 4 +-
.../cache/query/internal/index/IndexManager.java | 4 +-
.../internal/cache/PartitionedRegionClear.java | 5 +-
.../geode/test/dunit/rules/ClusterStartupRule.java | 23 +-
.../org/apache/geode/cache/query/data/City.java | 5 +-
6 files changed, 410 insertions(+), 7 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearQueryIndexDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearQueryIndexDUnitTest.java
new file mode 100644
index 0000000..feed3fc
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearQueryIndexDUnitTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.query.partitioned;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.junit.rules.VMProvider.invokeInEveryMember;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.IndexStatistics;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.data.City;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ClientCacheRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class PRClearQueryIndexDUnitTest {
+ public static final String MUMBAI_QUERY = "select * from /cities c where c.name = 'MUMBAI'";
+ public static final String ID_10_QUERY = "select * from /cities c where c.id = 10";
+ @ClassRule
+ public static ClusterStartupRule cluster = new ClusterStartupRule(4, true);
+
+ private static MemberVM server1;
+ private static MemberVM server2;
+
+ private static DUnitBlackboard blackboard;
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public ExecutorServiceRule executor = ExecutorServiceRule.builder().build();
+
+ private ClientCache clientCache;
+ private Region cities;
+
+ // class test setup. set up the servers, regions and indexes on the servers
+ @BeforeClass
+ public static void beforeClass() {
+ int locatorPort = ClusterStartupRule.getDUnitLocatorPort();
+ server1 = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort)
+ .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*")
+ .withRegion(RegionShortcut.PARTITION, "cities"));
+ server2 = cluster.startServerVM(2, s -> s.withConnectionToLocator(locatorPort)
+ .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*")
+ .withRegion(RegionShortcut.PARTITION, "cities"));
+
+ server1.invoke(() -> {
+ Cache cache = ClusterStartupRule.getCache();
+ Region region = cache.getRegion("cities");
+ // create indexes
+ QueryService queryService = cache.getQueryService();
+ queryService.createKeyIndex("cityId", "c.id", "/cities c");
+ queryService.createIndex("cityName", "c.name", "/cities c");
+ assertThat(cache.getQueryService().getIndexes(region))
+ .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName");
+ });
+
+ server2.invoke(() -> {
+ Cache cache = ClusterStartupRule.getCache();
+ Region region = cache.getRegion("cities");
+ assertThat(cache.getQueryService().getIndexes(region))
+ .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName");
+ });
+ }
+
+ // before every test method, create the client cache and region
+ @Before
+ public void before() throws Exception {
+ int locatorPort = ClusterStartupRule.getDUnitLocatorPort();
+ clientCache = clientCacheRule.withLocatorConnection(locatorPort).createCache();
+ cities = clientCacheRule.createProxyRegion("cities");
+ }
+
+ @Test
+ public void clearOnEmptyRegion() throws Exception {
+ cities.clear();
+ invokeInEveryMember(() -> {
+ verifyIndexesAfterClear("cities", "cityId", "cityName");
+ }, server1, server2);
+
+ IntStream.range(0, 10).forEach(i -> cities.put(i, new City(i)));
+ cities.clear();
+ invokeInEveryMember(() -> {
+ verifyIndexesAfterClear("cities", "cityId", "cityName");
+ }, server1, server2);
+ }
+
+ @Test
+ public void createIndexWhileClear() throws Exception {
+ IntStream.range(0, 1000).forEach(i -> cities.put(i, new City(i)));
+
+ // create index while clear
+ AsyncInvocation createIndex = server1.invokeAsync("create index", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ QueryService queryService = cache.getQueryService();
+ Index cityZip = queryService.createIndex("cityZip", "c.zip", "/cities c");
+ assertThat(cityZip).isNotNull();
+ });
+
+ // do clear for 3 times at the same time to increease the concurrency of clear and createIndex
+ for (int i = 0; i < 3; i++) {
+ cities.clear();
+ }
+ createIndex.await();
+
+ invokeInEveryMember(() -> {
+ verifyIndexesAfterClear("cities", "cityId", "cityName");
+ }, server1, server2);
+
+ QueryService queryService = clientCache.getQueryService();
+ Query query =
+ queryService.newQuery("select * from /cities c where c.zip < " + (City.ZIP_START + 10));
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(0);
+
+ IntStream.range(0, 10).forEach(i -> cities.put(i, new City(i)));
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(10);
+ }
+
+ @Test
+ public void createIndexWhileClearOnReplicateRegion() throws Exception {
+ invokeInEveryMember(() -> {
+ Cache cache = ClusterStartupRule.getCache();
+ cache.createRegionFactory(RegionShortcut.PARTITION)
+ .create("replicateCities");
+ }, server1, server2);
+
+ Region replicateCities = clientCacheRule.createProxyRegion("replicateCities");
+ IntStream.range(0, 1000).forEach(i -> replicateCities.put(i, new City(i)));
+
+ // create index while clear
+ AsyncInvocation createIndex = server1.invokeAsync("create index on replicate regions", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ QueryService queryService = cache.getQueryService();
+ Index cityZip = queryService.createIndex("cityZip_replicate", "c.zip", "/replicateCities c");
+ assertThat(cityZip).isNotNull();
+ });
+
+ // do clear at the same time for 3 timese
+ for (int i = 0; i < 3; i++) {
+ replicateCities.clear();
+ }
+ createIndex.await();
+
+ invokeInEveryMember(() -> {
+ verifyIndexesAfterClear("replicateCities", "cityZip_replicate");
+ }, server1, server2);
+
+ QueryService queryService = clientCache.getQueryService();
+ Query query =
+ queryService
+ .newQuery("select * from /replicateCities c where c.zip < " + (City.ZIP_START + 10));
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(0);
+
+ IntStream.range(0, 10).forEach(i -> replicateCities.put(i, new City(i)));
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(10);
+ }
+
+ @Test
+ public void removeIndexWhileClear() throws Exception {
+ // create cityZip index
+ server1.invoke("create index", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ QueryService queryService = cache.getQueryService();
+ Index cityZip = queryService.createIndex("cityZip", "c.zip", "/cities c");
+ assertThat(cityZip).isNotNull();
+ });
+
+ // remove index while clear
+ // removeIndex has to be invoked on each server. It's not distributed
+ AsyncInvocation removeIndex1 = server1.invokeAsync("remove index",
+ PRClearQueryIndexDUnitTest::removeCityZipIndex);
+ AsyncInvocation removeIndex2 = server2.invokeAsync("remove index",
+ PRClearQueryIndexDUnitTest::removeCityZipIndex);
+
+ cities.clear();
+ removeIndex1.await();
+ removeIndex2.await();
+
+ // make sure removeIndex and clear operations are successful
+ invokeInEveryMember(() -> {
+ InternalCache internalCache = ClusterStartupRule.getCache();
+ QueryService qs = internalCache.getQueryService();
+ Region region = internalCache.getRegion("cities");
+ assertThat(region.size()).isEqualTo(0);
+ // verify only 2 indexes created in the beginning of the tests exist
+ assertThat(qs.getIndexes(region)).extracting(Index::getName)
+ .containsExactlyInAnyOrder("cityId", "cityName");
+ }, server1, server2);
+ }
+
+ private static void removeCityZipIndex() {
+ Cache cache = ClusterStartupRule.getCache();
+ QueryService qs = cache.getQueryService();
+ Region<Object, Object> region = cache.getRegion("cities");
+ Index cityZip = qs.getIndex(region, "cityZip");
+ if (cityZip != null) {
+ qs.removeIndex(cityZip);
+ }
+ }
+
+ @Test
+ public void verifyQuerySucceedsAfterClear() throws Exception {
+ // put in some data
+ IntStream.range(0, 100).forEach(i -> cities.put(i, new City(i)));
+
+ QueryService queryService = clientCache.getQueryService();
+ Query query = queryService.newQuery(MUMBAI_QUERY);
+ Query query2 = queryService.newQuery(ID_10_QUERY);
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(50);
+ assertThat(((SelectResults) query2.execute()).size()).isEqualTo(1);
+
+ cities.clear();
+ invokeInEveryMember(() -> {
+ verifyIndexesAfterClear("cities", "cityId", "cityName");
+ }, server1, server2);
+
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(0);
+ assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0);
+ }
+
+ private static void verifyIndexesAfterClear(String regionName, String... indexes) {
+ InternalCache internalCache = ClusterStartupRule.getCache();
+ QueryService qs = internalCache.getQueryService();
+ Region region = internalCache.getRegion(regionName);
+ assertThat(region.size()).isEqualTo(0);
+ for (String indexName : indexes) {
+ Index index = qs.getIndex(region, indexName);
+ IndexStatistics statistics = index.getStatistics();
+ assertThat(statistics.getNumberOfKeys()).isEqualTo(0);
+ assertThat(statistics.getNumberOfValues()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void concurrentClearAndQuery() {
+ QueryService queryService = clientCache.getQueryService();
+ Query query = queryService.newQuery(MUMBAI_QUERY);
+ Query query2 = queryService.newQuery(ID_10_QUERY);
+
+ IntStream.range(0, 100).forEach(i -> cities.put(i, new City(i)));
+
+ server1.invokeAsync(() -> {
+ Cache cache = ClusterStartupRule.getCache();
+ Region region = cache.getRegion("cities");
+ region.clear();
+ });
+
+ await().untilAsserted(() -> {
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(0);
+ assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0);
+ });
+ }
+
+ @Test
+ public void concurrentClearAndPut() throws Exception {
+ AsyncInvocation puts = server1.invokeAsync(() -> {
+ Cache cache = ClusterStartupRule.getCache();
+ Region region = cache.getRegion("cities");
+ for (int i = 0; i < 1000; i++) {
+ // wait for gate to open
+ getBlackboard().waitForGate("proceedToPut", 60, TimeUnit.SECONDS);
+ region.put(i, new City(i));
+ }
+ });
+
+ AsyncInvocation clears = server2.invokeAsync(() -> {
+ Cache cache = ClusterStartupRule.getCache();
+ Region region = cache.getRegion("cities");
+ // do clear 10 times
+ for (int i = 0; i < 10; i++) {
+ try {
+ // don't allow put to proceed. It's like "close the gate"
+ getBlackboard().clearGate("proceedToPut");
+ region.clear();
+ verifyIndexesAfterClear("cities", "cityId", "cityName");
+ } finally {
+ // allow put to proceed. It's like "open the gate"
+ getBlackboard().signalGate("proceedToPut");
+ }
+ }
+ });
+
+ puts.await();
+ clears.await();
+ }
+
+ @Test
+ public void serverLeavingAndJoiningWhilePutAndClear() throws Exception {
+ int locatorPort = ClusterStartupRule.getDUnitLocatorPort();
+ Future<Void> startStopServer = executor.submit(() -> {
+ for (int i = 0; i < 3; i++) {
+ MemberVM server3 = cluster.startServerVM(3, s -> s.withConnectionToLocator(locatorPort)
+ .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*")
+ .withRegion(RegionShortcut.PARTITION, "cities"));
+ server3.stop(false);
+ }
+ });
+
+ Future<Void> putAndClear = executor.submit(() -> {
+ for (int i = 0; i < 30; i++) {
+ IntStream.range(0, 100).forEach(j -> cities.put(j, new City(j)));
+ try {
+ cities.clear();
+
+ // only verify if clear is successful
+ QueryService queryService = clientCache.getQueryService();
+ Query query = queryService.newQuery(MUMBAI_QUERY);
+ Query query2 = queryService.newQuery(ID_10_QUERY);
+ assertThat(((SelectResults) query.execute()).size()).isEqualTo(0);
+ assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0);
+ } catch (ServerOperationException e) {
+ assertThat(e.getCause().getMessage())
+ .contains("Unable to clear all the buckets from the partitioned region cities")
+ .contains("either data (buckets) moved or member departed");
+ }
+ }
+ });
+ startStopServer.get(60, TimeUnit.SECONDS);
+ putAndClear.get(60, TimeUnit.SECONDS);
+ }
+
+ private static DUnitBlackboard getBlackboard() {
+ if (blackboard == null) {
+ blackboard = new DUnitBlackboard();
+ }
+ return blackboard;
+ }
+
+ @After
+ public void tearDown() {
+ invokeInEveryMember(() -> {
+ if (blackboard != null) {
+ blackboard.clearGate("proceedToPut");
+ }
+ // remove the cityZip index
+ removeCityZipIndex();
+ }, server1, server2);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
index 2930a3a..2895aaf 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/DefaultQueryService.java
@@ -213,7 +213,7 @@ public class DefaultQueryService implements InternalQueryService {
throw new UnsupportedOperationException(
"Index creation on the server is not supported from the client.");
}
- PartitionedIndex parIndex = null;
+
if (region == null) {
region = getRegionFromPath(imports, fromClause);
}
@@ -241,6 +241,7 @@ public class DefaultQueryService implements InternalQueryService {
}
}
if (region instanceof PartitionedRegion) {
+ PartitionedIndex parIndex = null;
try {
parIndex = (PartitionedIndex) ((PartitionedRegion) region).createIndex(false, indexType,
indexName, indexedExpression, fromClause, imports, loadEntries);
@@ -256,7 +257,6 @@ public class DefaultQueryService implements InternalQueryService {
return parIndex;
} else {
-
IndexManager indexManager = IndexUtils.getIndexManager(this.cache, region, true);
Index index = indexManager.createIndex(indexName, indexType, indexedExpression, fromClause,
imports, null, null, loadEntries);
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
index 5b2867b..0501603 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
@@ -275,6 +275,8 @@ public class IndexManager {
}
try {
+ ((LocalRegion) this.region).lockRVVForBulkOp();
+
String projectionAttributes = "*"; // for now this is the only option
if (getIndex(indexName) != null) {
@@ -425,7 +427,7 @@ public class IndexManager {
} finally {
this.cache.setPdxReadSerializedOverride(oldReadSerialized);
((TXManagerImpl) this.cache.getCacheTransactionManager()).unpauseTransaction(tx);
-
+ ((LocalRegion) this.region).unlockRVVForBulkOp();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index 4796a17..e8b01d8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -270,10 +270,13 @@ public class PartitionedRegionClear {
} while (true);
}
+ /**
+ * @return buckets that are cleared. empty set if any exception happened
+ */
protected Set<Integer> attemptToSendPartitionedRegionClearMessage(RegionEventImpl event,
PartitionedRegionClearMessage.OperationType op)
throws ForceReattemptException {
- Set<Integer> bucketsOperated = null;
+ Set<Integer> bucketsOperated = new HashSet<>();
if (partitionedRegion.getPRRoot() == null) {
if (logger.isDebugEnabled()) {
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
index c817f8c..8de2513 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
@@ -39,6 +39,7 @@ import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.DUnitEnv;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.SerializableConsumerIF;
@@ -97,6 +98,7 @@ public class ClusterStartupRule implements SerializableTestRule {
}
private final int vmCount;
+ private final boolean launchDunitLocator;
private final DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@@ -106,11 +108,20 @@ public class ClusterStartupRule implements SerializableTestRule {
private boolean logFile = false;
public ClusterStartupRule() {
- this(NUM_VMS);
+ this(NUM_VMS, false);
}
public ClusterStartupRule(final int vmCount) {
+ this(vmCount, false);
+ }
+
+ public ClusterStartupRule(final boolean launchDunitLocator) {
+ this(NUM_VMS, launchDunitLocator);
+ }
+
+ public ClusterStartupRule(final int vmCount, boolean launchDunitLocator) {
this.vmCount = vmCount;
+ this.launchDunitLocator = launchDunitLocator;
}
public static ClientCache getClientCache() {
@@ -148,7 +159,7 @@ public class ClusterStartupRule implements SerializableTestRule {
// GEODE-6247: JDK 11 has an issue where native code is reporting committed is 2MB > max.
IgnoredException.addIgnoredException("committed = 538968064 should be < max = 536870912");
}
- DUnitLauncher.launchIfNeeded(false);
+ DUnitLauncher.launchIfNeeded(launchDunitLocator);
for (int i = 0; i < vmCount; i++) {
Host.getHost(0).getVM(i);
}
@@ -156,6 +167,14 @@ public class ClusterStartupRule implements SerializableTestRule {
occupiedVMs = new HashMap<>();
}
+ /**
+ * Returns the port that the standard dunit locator is listening on.
+ */
+ public static int getDUnitLocatorPort() {
+ return DUnitEnv.get().getLocatorPort();
+ }
+
+
private void after(Description description) throws Throwable {
if (!skipLocalDistributedSystemCleanup) {
diff --git a/geode-junit/src/main/java/org/apache/geode/cache/query/data/City.java b/geode-junit/src/main/java/org/apache/geode/cache/query/data/City.java
index e7e7b39..622bf5d 100644
--- a/geode-junit/src/main/java/org/apache/geode/cache/query/data/City.java
+++ b/geode-junit/src/main/java/org/apache/geode/cache/query/data/City.java
@@ -24,6 +24,8 @@ package org.apache.geode.cache.query.data;
import java.io.Serializable;
public class City implements Serializable {
+ public static int ZIP_START = 300000;
+ public int id;
public String name;
public int zip;
@@ -37,7 +39,8 @@ public class City implements Serializable {
String arr1[] = {"MUMBAI", "PUNE", "GANDHINAGAR", "CHANDIGARH"};
/* this is for the test to have 50% of the objects belonging to one city */
this.name = arr1[i % 2];
- this.zip = 425125 + i;
+ this.zip = ZIP_START + i;
+ this.id = i;
}// end of constructor 2
////////////////////////////