You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2021/03/16 15:33:59 UTC
[geode] branch develop updated: GEODE-9030: Modified
PartitionedIndex to reset arbitraryBucketIndex
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d7342cd GEODE-9030: Modified PartitionedIndex to reset arbitraryBucketIndex
d7342cd is described below
commit d7342cd0266010313a245920dee8e82034593608
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Tue Mar 16 08:32:51 2021 -0700
GEODE-9030: Modified PartitionedIndex to reset arbitraryBucketIndex
---
...nequalityQueryWithRebalanceDistributedTest.java | 172 +++++++++++++++++++++
.../internal/index/PartitionedIndexJUnitTest.java | 65 +++++++-
.../query/internal/index/PartitionedIndex.java | 6 +-
3 files changed, 235 insertions(+), 8 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/InequalityQueryWithRebalanceDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/InequalityQueryWithRebalanceDistributedTest.java
new file mode 100644
index 0000000..41ea155
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/InequalityQueryWithRebalanceDistributedTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.Serializable;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.cache.Region;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.OQLIndexTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category({OQLIndexTest.class})
+public class InequalityQueryWithRebalanceDistributedTest implements Serializable {
+
+ private String regionName;
+
+ private static MemberVM locator;
+
+ @ClassRule
+ public static GfshCommandRule gfsh = new GfshCommandRule();
+
+ @ClassRule
+ public static ClusterStartupRule cluster = new ClusterStartupRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ locator = cluster.startLocatorVM(0);
+ gfsh.connectAndVerify(locator);
+ }
+
+ @Before
+ public void before() throws Exception {
+ regionName = getClass().getSimpleName() + "_" + testName.getMethodName() + "_region";
+ }
+
+ @Test
+ public void testArbitraryBucketIndexUpdatedAfterBucketMoved() {
+ // Start server1
+ MemberVM server1 = cluster.startServerVM(1, locator.getPort());
+
+ // Create the region
+ createRegion();
+
+ // Create the index
+ createIndex();
+
+ // Load entries
+ server1.invoke(this::loadRegion);
+
+ // Start server2
+ cluster.startServerVM(2, locator.getPort());
+
+ // Wait for server2 to create the region MBean
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers(SEPARATOR + regionName, 2);
+
+ // Invoke rebalance
+ rebalance();
+
+ // Execute query
+ executeQuery();
+ }
+
+ private void createRegion() {
+ gfsh.executeAndAssertThat("create region --name=" + regionName
+ + " --type=PARTITION --total-num-buckets=2 --partition-resolver="
+ + IsolatingPartitionResolver.class.getName()).statusIsSuccess();
+ locator.waitUntilRegionIsReadyOnExactlyThisManyServers(SEPARATOR + regionName, 1);
+ }
+
+ private void createIndex() {
+ gfsh.executeAndAssertThat("create index --region=" + regionName
+ + " --name=tradeStatus --expression=tradeStatus.toString()").statusIsSuccess();
+ }
+
+ private void loadRegion() {
+ Region<Integer, Trade> region = ClusterStartupRule.getCache().getRegion(regionName);
+ IntStream.range(0, 10).forEach(i -> region.put(i,
+ new Trade(String.valueOf(i), "aId1", i % 2 == 0 ? TradeStatus.OPEN : TradeStatus.CLOSED)));
+ }
+
+ private void rebalance() {
+ gfsh.executeAndAssertThat("rebalance").statusIsSuccess();
+ }
+
+ private void executeQuery() {
+ gfsh.executeAndAssertThat("query --query=\"SELECT * FROM " + SEPARATOR + regionName
+ + " WHERE arrangementId = 'aId1' AND tradeStatus.toString() != 'CLOSED'\"")
+ .statusIsSuccess();
+ }
+
+ public enum TradeStatus {
+ OPEN,
+ CLOSED
+ }
+
+ public static class Trade implements PdxSerializable {
+
+ public String id;
+
+ public String arrangementId;
+
+ public TradeStatus tradeStatus;
+
+ public Trade() {}
+
+ public Trade(String id, String arrangementId, TradeStatus tradeStatus) {
+ this.id = id;
+ this.arrangementId = arrangementId;
+ this.tradeStatus = tradeStatus;
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {
+ writer.writeString("id", this.id);
+ writer.writeString("arrangementId", this.arrangementId);
+ writer.writeObject("tradeStatus", this.tradeStatus);
+ }
+
+ @Override
+ public void fromData(PdxReader reader) {
+ this.id = reader.readString("id");
+ this.arrangementId = reader.readString("arrangementId");
+ this.tradeStatus = (TradeStatus) reader.readObject("tradeStatus");
+ }
+ }
+
+ public static class IsolatingPartitionResolver implements PartitionResolver<Integer, Trade> {
+
+ public IsolatingPartitionResolver() {}
+
+ public Object getRoutingObject(EntryOperation<Integer, Trade> operation) {
+ // Route key=0 to bucket 0 and all other keys to bucket 1
+ return operation.getKey() == 0 ? 0 : 1;
+ }
+
+ public String getName() {
+ return getClass().getSimpleName();
+ }
+ }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java
index d131092..5d5ddaa 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.query.internal.index;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@@ -23,6 +24,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.IndexType;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.InternalCache;
@@ -37,13 +39,7 @@ public class PartitionedIndexJUnitTest {
final int DATA_SIZE_TO_BE_POPULATED = 10000;
final int THREAD_POOL_SIZE = 20;
- Region region = mock(Region.class);
- InternalCache cache = mock(InternalCache.class);
- when(region.getCache()).thenReturn(cache);
- DistributedSystem distributedSystem = mock(DistributedSystem.class);
- when(cache.getDistributedSystem()).thenReturn(distributedSystem);
- PartitionedIndex partitionedIndex = new PartitionedIndex(cache, IndexType.FUNCTIONAL,
- "dummyString", region, "dummyString", "dummyString", "dummyString");
+ PartitionedIndex partitionedIndex = createPartitionedIndex();
Runnable populateSetTask = () -> {
for (int i = 0; i < DATA_SIZE_TO_BE_POPULATED; i++) {
partitionedIndex.mapIndexKeys.add("" + i);
@@ -64,6 +60,61 @@ public class PartitionedIndexJUnitTest {
}
assertEquals(DATA_SIZE_TO_BE_POPULATED, partitionedIndex.mapIndexKeys.size());
+ }
+
+ @Test
+ public void verifyAddToAndRemoveFromBucketIndexesUpdatesArbitraryBucketIndexOneIndexCase() {
+ // Create the PartitionedIndex
+ PartitionedIndex partitionedIndex = createPartitionedIndex();
+
+ // Create the mock Region and Index
+ Region region = mock(Region.class);
+ Index index = mock(Index.class);
+
+ // Add the index to the bucket indexes
+ partitionedIndex.addToBucketIndexes(region, index);
+
+ // Assert that the arbitraryBucketIndex is set to the index
+ assertThat(partitionedIndex.getBucketIndex()).isEqualTo(index);
+
+ // Remove the index from the bucket indexes
+ partitionedIndex.removeFromBucketIndexes(region, index);
+
+ // Assert that the arbitraryBucketIndex is null
+ assertThat(partitionedIndex.getBucketIndex()).isNull();
+ }
+ @Test
+ public void verifyAddToAndRemoveFromBucketIndexesUpdatesArbitraryBucketIndexTwoIndexesCase() {
+ // Create the PartitionedIndex
+ PartitionedIndex partitionedIndex = createPartitionedIndex();
+
+ // Create the mock Region and Indexes
+ Region region = mock(Region.class);
+ Index index1 = mock(Index.class);
+ Index index2 = mock(Index.class);
+
+ // Add the mock indexes to the bucket indexes
+ partitionedIndex.addToBucketIndexes(region, index1);
+ partitionedIndex.addToBucketIndexes(region, index2);
+
+ // Assert that the arbitraryBucketIndex is set to index1
+ assertThat(partitionedIndex.getBucketIndex()).isEqualTo(index1);
+
+ // Remove index1 from the bucket indexes
+ partitionedIndex.removeFromBucketIndexes(region, index1);
+
+ // Assert that the arbitraryBucketIndex is index2
+ assertThat(partitionedIndex.getBucketIndex()).isEqualTo(index2);
+ }
+
+ private PartitionedIndex createPartitionedIndex() {
+ Region region = mock(Region.class);
+ InternalCache cache = mock(InternalCache.class);
+ when(region.getCache()).thenReturn(cache);
+ DistributedSystem distributedSystem = mock(DistributedSystem.class);
+ when(cache.getDistributedSystem()).thenReturn(distributedSystem);
+ return new PartitionedIndex(cache, IndexType.FUNCTIONAL,
+ "dummyString", region, "dummyString", "dummyString", "dummyString");
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
index 4300735..018baa1 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
@@ -132,7 +132,7 @@ public class PartitionedIndex extends AbstractIndex {
}
}
if (index == arbitraryBucketIndex) {
- setArbitraryBucketIndex(retrieveArbitraryBucketIndex());
+ resetArbitraryBucketIndex(retrieveArbitraryBucketIndex());
}
}
}
@@ -184,6 +184,10 @@ public class PartitionedIndex extends AbstractIndex {
}
}
+ private void resetArbitraryBucketIndex(Index index) {
+ arbitraryBucketIndex = index;
+ }
+
public Index retrieveArbitraryBucketIndex() {
Index index = null;
synchronized (this.bucketIndexes) {