You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/03/26 20:44:36 UTC

[geode] branch support/1.12 updated: GEODE-9030: Modified PartitionedIndex to reset arbitraryBucketIndex (#6188)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 83cb3d4  GEODE-9030: Modified PartitionedIndex to reset arbitraryBucketIndex (#6188)
83cb3d4 is described below

commit 83cb3d4a4a8dd71529b56f3dbb39f3871469ff45
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Fri Mar 26 10:41:48 2021 -1000

    GEODE-9030: Modified PartitionedIndex to reset arbitraryBucketIndex (#6188)
    
    (cherry picked from commit d7342cd0266010313a245920dee8e82034593608)
---
 ...nequalityQueryWithRebalanceDistributedTest.java | 172 +++++++++++++++++++++
 .../internal/index/PartitionedIndexJUnitTest.java  |  65 +++++++-
 .../query/internal/index/PartitionedIndex.java     |   6 +-
 3 files changed, 235 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/InequalityQueryWithRebalanceDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/InequalityQueryWithRebalanceDistributedTest.java
new file mode 100644
index 0000000..41ea155
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/dunit/InequalityQueryWithRebalanceDistributedTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.Serializable;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.cache.Region;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.OQLIndexTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category({OQLIndexTest.class})
+public class InequalityQueryWithRebalanceDistributedTest implements Serializable {
+
+  private String regionName;
+
+  private static MemberVM locator;
+
+  @ClassRule
+  public static GfshCommandRule gfsh = new GfshCommandRule();
+
+  @ClassRule
+  public static ClusterStartupRule cluster = new ClusterStartupRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    gfsh.connectAndVerify(locator);
+  }
+
+  @Before
+  public void before() throws Exception {
+    regionName = getClass().getSimpleName() + "_" + testName.getMethodName() + "_region";
+  }
+
+  @Test
+  public void testArbitraryBucketIndexUpdatedAfterBucketMoved() {
+    // Start server1
+    MemberVM server1 = cluster.startServerVM(1, locator.getPort());
+
+    // Create the region
+    createRegion();
+
+    // Create the index
+    createIndex();
+
+    // Load entries
+    server1.invoke(this::loadRegion);
+
+    // Start server2
+    cluster.startServerVM(2, locator.getPort());
+
+    // Wait for server2 to create the region MBean
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers(SEPARATOR + regionName, 2);
+
+    // Invoke rebalance
+    rebalance();
+
+    // Execute query
+    executeQuery();
+  }
+
+  private void createRegion() {
+    gfsh.executeAndAssertThat("create region --name=" + regionName
+        + " --type=PARTITION  --total-num-buckets=2 --partition-resolver="
+        + IsolatingPartitionResolver.class.getName()).statusIsSuccess();
+    locator.waitUntilRegionIsReadyOnExactlyThisManyServers(SEPARATOR + regionName, 1);
+  }
+
+  private void createIndex() {
+    gfsh.executeAndAssertThat("create index --region=" + regionName
+        + " --name=tradeStatus --expression=tradeStatus.toString()").statusIsSuccess();
+  }
+
+  private void loadRegion() {
+    Region<Integer, Trade> region = ClusterStartupRule.getCache().getRegion(regionName);
+    IntStream.range(0, 10).forEach(i -> region.put(i,
+        new Trade(String.valueOf(i), "aId1", i % 2 == 0 ? TradeStatus.OPEN : TradeStatus.CLOSED)));
+  }
+
+  private void rebalance() {
+    gfsh.executeAndAssertThat("rebalance").statusIsSuccess();
+  }
+
+  private void executeQuery() {
+    gfsh.executeAndAssertThat("query --query=\"SELECT * FROM " + SEPARATOR + regionName
+        + " WHERE arrangementId = 'aId1' AND tradeStatus.toString() != 'CLOSED'\"")
+        .statusIsSuccess();
+  }
+
+  public enum TradeStatus {
+    OPEN,
+    CLOSED
+  }
+
+  public static class Trade implements PdxSerializable {
+
+    public String id;
+
+    public String arrangementId;
+
+    public TradeStatus tradeStatus;
+
+    public Trade() {}
+
+    public Trade(String id, String arrangementId, TradeStatus tradeStatus) {
+      this.id = id;
+      this.arrangementId = arrangementId;
+      this.tradeStatus = tradeStatus;
+    }
+
+    @Override
+    public void toData(PdxWriter writer) {
+      writer.writeString("id", this.id);
+      writer.writeString("arrangementId", this.arrangementId);
+      writer.writeObject("tradeStatus", this.tradeStatus);
+    }
+
+    @Override
+    public void fromData(PdxReader reader) {
+      this.id = reader.readString("id");
+      this.arrangementId = reader.readString("arrangementId");
+      this.tradeStatus = (TradeStatus) reader.readObject("tradeStatus");
+    }
+  }
+
+  public static class IsolatingPartitionResolver implements PartitionResolver<Integer, Trade> {
+
+    public IsolatingPartitionResolver() {}
+
+    public Object getRoutingObject(EntryOperation<Integer, Trade> operation) {
+      // Route key=0 to bucket 0 and all other keys to bucket 1
+      return operation.getKey() == 0 ? 0 : 1;
+    }
+
+    public String getName() {
+      return getClass().getSimpleName();
+    }
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java
index d131092..5d5ddaa 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/index/PartitionedIndexJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.cache.query.internal.index;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -23,6 +24,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.Index;
 import org.apache.geode.cache.query.IndexType;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
@@ -37,13 +39,7 @@ public class PartitionedIndexJUnitTest {
     final int DATA_SIZE_TO_BE_POPULATED = 10000;
     final int THREAD_POOL_SIZE = 20;
 
-    Region region = mock(Region.class);
-    InternalCache cache = mock(InternalCache.class);
-    when(region.getCache()).thenReturn(cache);
-    DistributedSystem distributedSystem = mock(DistributedSystem.class);
-    when(cache.getDistributedSystem()).thenReturn(distributedSystem);
-    PartitionedIndex partitionedIndex = new PartitionedIndex(cache, IndexType.FUNCTIONAL,
-        "dummyString", region, "dummyString", "dummyString", "dummyString");
+    PartitionedIndex partitionedIndex = createPartitionedIndex();
     Runnable populateSetTask = () -> {
       for (int i = 0; i < DATA_SIZE_TO_BE_POPULATED; i++) {
         partitionedIndex.mapIndexKeys.add("" + i);
@@ -64,6 +60,61 @@ public class PartitionedIndexJUnitTest {
     }
 
     assertEquals(DATA_SIZE_TO_BE_POPULATED, partitionedIndex.mapIndexKeys.size());
+  }
+
+  @Test
+  public void verifyAddToAndRemoveFromBucketIndexesUpdatesArbitraryBucketIndexOneIndexCase() {
+    // Create the PartitionedIndex
+    PartitionedIndex partitionedIndex = createPartitionedIndex();
+
+    // Create the mock Region and Index
+    Region region = mock(Region.class);
+    Index index = mock(Index.class);
+
+    // Add the index to the bucket indexes
+    partitionedIndex.addToBucketIndexes(region, index);
+
+    // Assert that the arbitraryBucketIndex is set to the index
+    assertThat(partitionedIndex.getBucketIndex()).isEqualTo(index);
+
+    // Remove the index from the bucket indexes
+    partitionedIndex.removeFromBucketIndexes(region, index);
+
+    // Assert that the arbitraryBucketIndex is null
+    assertThat(partitionedIndex.getBucketIndex()).isNull();
+  }
 
+  @Test
+  public void verifyAddToAndRemoveFromBucketIndexesUpdatesArbitraryBucketIndexTwoIndexesCase() {
+    // Create the PartitionedIndex
+    PartitionedIndex partitionedIndex = createPartitionedIndex();
+
+    // Create the mock Region and Indexes
+    Region region = mock(Region.class);
+    Index index1 = mock(Index.class);
+    Index index2 = mock(Index.class);
+
+    // Add the mock indexes to the bucket indexes
+    partitionedIndex.addToBucketIndexes(region, index1);
+    partitionedIndex.addToBucketIndexes(region, index2);
+
+    // Assert that the arbitraryBucketIndex is set to index1
+    assertThat(partitionedIndex.getBucketIndex()).isEqualTo(index1);
+
+    // Remove index1 from the bucket indexes
+    partitionedIndex.removeFromBucketIndexes(region, index1);
+
+    // Assert that the arbitraryBucketIndex is index2
+    assertThat(partitionedIndex.getBucketIndex()).isEqualTo(index2);
+  }
+
+  private PartitionedIndex createPartitionedIndex() {
+    Region region = mock(Region.class);
+    InternalCache cache = mock(InternalCache.class);
+    when(region.getCache()).thenReturn(cache);
+    DistributedSystem distributedSystem = mock(DistributedSystem.class);
+    when(cache.getDistributedSystem()).thenReturn(distributedSystem);
+    return new PartitionedIndex(cache, IndexType.FUNCTIONAL,
+        "dummyString", region, "dummyString", "dummyString", "dummyString");
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
index 3400d9a..6b62776 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/PartitionedIndex.java
@@ -132,7 +132,7 @@ public class PartitionedIndex extends AbstractIndex {
         }
       }
       if (index == arbitraryBucketIndex) {
-        setArbitraryBucketIndex(retrieveArbitraryBucketIndex());
+        resetArbitraryBucketIndex(retrieveArbitraryBucketIndex());
       }
     }
   }
@@ -184,6 +184,10 @@ public class PartitionedIndex extends AbstractIndex {
     }
   }
 
+  private void resetArbitraryBucketIndex(Index index) {
+    arbitraryBucketIndex = index;
+  }
+
   public Index retrieveArbitraryBucketIndex() {
     Index index = null;
     synchronized (this.bucketIndexes) {