You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/12/01 18:53:58 UTC

[geode] 26/26: GEODE-7675: Partitioned Region clear should be successful when clients are present with subscription enabled (#5727)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8ddd865ee40958642a5f53039a45afe2f587c924
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Tue Nov 10 18:30:04 2020 -0800

    GEODE-7675: Partitioned Region clear should be successful when clients are present with subscription enabled (#5727)
---
 .../internal/cache/PartitionRegionClearHATest.java | 236 +++++++++++++++++++++
 .../test/concurrent/FileBasedCountDownLatch.java   |   2 +-
 2 files changed, 237 insertions(+), 1 deletion(-)

diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
new file mode 100644
index 0000000..7d0db0a
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.test.concurrent.FileBasedCountDownLatch;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionRegionClearHATest implements Serializable {
+  public static final String NAME = "testRegion";
+  protected static MemberVM locator;
+  protected static MemberVM server1, server2;
+  protected static ClientVM client1, client2;
+  protected static List<MemberVM> servers;
+
+  @ClassRule
+  public static ClusterStartupRule cluster = new ClusterStartupRule(5);
+
+  private static AtomicBoolean clearEventReceived = new AtomicBoolean(false);
+  private FileBasedCountDownLatch latch;
+
+  @BeforeClass
+  // setup once for all the tests: start up one locator, two servers and two clients.
+  // create the regions on all servers and clients. client1 has registered interests. client2
+  // has CQ
+  public static void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    int locatorPort = locator.getPort();
+    server1 = cluster.startServerVM(1,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+    server2 = cluster.startServerVM(2,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+    servers = Arrays.asList(server1, server2);
+
+    client1 = cluster.startClientVM(3, cc -> {
+      cc.withLocatorConnection(locatorPort).withPoolSubscription(true);
+    });
+
+    client2 = cluster.startClientVM(4, cc -> {
+      cc.withLocatorConnection(locatorPort).withPoolSubscription(true);
+    });
+
+    // set client1 with a registered interest
+    client1.invoke(() -> {
+      Region<Object, Object> testRegion = ClusterStartupRule.getClientCache()
+          .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+          .create(NAME);
+      testRegion.registerInterestForAllKeys();
+    });
+
+    // set client2 with a CQ
+    client2.invoke(() -> {
+      ClusterStartupRule.getClientCache()
+          .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+          .create(NAME);
+
+      QueryService queryService =
+          ClusterStartupRule.getClientCache().getDefaultPool().getQueryService();
+      CqAttributesFactory cqaFactory = new CqAttributesFactory();
+      cqaFactory.addCqListener(new CqListener() {
+        @Override
+        public void onEvent(CqEvent aCqEvent) {
+          Operation baseOperation = aCqEvent.getBaseOperation();
+          if (baseOperation.isClear()) {
+            clearEventReceived.set(true);
+          }
+        }
+
+        @Override
+        public void onError(CqEvent aCqEvent) {}
+      });
+
+      CqQuery cqQuery =
+          queryService.newCq("select * from /" + NAME, cqaFactory.create());
+      cqQuery.execute();
+    });
+  }
+
+  @Before
+  // before each test, initialize and set the DistributionMessageObserver and input some data into
+  // the region
+  public void before() throws Exception {
+    latch = new FileBasedCountDownLatch(2);
+    MemberVM.invokeInEveryMember(() -> {
+      PauseDuringClearDistributionMessageObserver observer =
+          new PauseDuringClearDistributionMessageObserver();
+      observer.setLatch(latch);
+      DistributionMessageObserver.setInstance(observer);
+    }, server1, server2);
+
+    client1.invoke(() -> {
+      Region<Object, Object> testRegion = getClientRegion();
+      for (int i = 0; i < 10; i++) {
+        testRegion.put(i, "value" + i);
+      }
+    });
+  }
+
+  @After
+  // after each test, reset the boolean variable and clear the DistributionMessageObserver
+  public void after() throws Exception {
+    clearEventReceived.set(false);
+    MemberVM.invokeInEveryMember(() -> {
+      DistributionMessageObserver.setInstance(null);
+    }, server1, server2);
+  }
+
+  @Test
+  public void restartServerThatIssuesClear() throws Exception {
+    // sets the latch and issue the clear
+    server1.invokeAsync(() -> {
+      getRegion().clear();
+    });
+
+    // only restart the server1 until we have reached the observer code
+    await().until(() -> latch.currentValue() == 1);
+
+    // restart server1
+    server1.stop(false);
+    int locatorPort = locator.getPort();
+    server1 = cluster.startServerVM(1,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+
+    latch.countDown();
+    verifyCleared();
+  }
+
+  @Test
+  public void restartServerThatDoesNotIssueClear() throws Exception {
+    server1.invokeAsync(() -> {
+      getRegion().clear();
+    });
+
+    // only restart the server2 until we have reached the observer code
+    await().until(() -> latch.currentValue() == 1);
+
+    // restart server2
+    server2.stop(false);
+    int locatorPort = locator.getPort();
+    server2 = cluster.startServerVM(2,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+
+    latch.countDown();
+    verifyCleared();
+  }
+
+  private void verifyCleared() {
+    client1.invoke(() -> {
+      Region<Object, Object> testRegion = getClientRegion();
+      await().untilAsserted(() -> assertThat(testRegion.size()).isEqualTo(0));
+    });
+
+    client2.invoke(() -> {
+      assertThat(clearEventReceived.get()).isTrue();
+      assertThat(getClientRegion().size()).isEqualTo(0);
+    });
+
+    MemberVM.invokeInEveryMember(() -> {
+      assertThat(getRegion().size()).isEqualTo(0);
+    }, server1, server2);
+  }
+
+  private static Region<Object, Object> getClientRegion() {
+    return ClusterStartupRule.getClientCache().getRegion("/" + NAME);
+  }
+
+  private static Region<Object, Object> getRegion() {
+    return ClusterStartupRule.getCache().getRegion("/" + NAME);
+  }
+
+  private static class PauseDuringClearDistributionMessageObserver
+      extends DistributionMessageObserver {
+    private FileBasedCountDownLatch latch;
+
+    public void setLatch(FileBasedCountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (message instanceof PartitionedRegionClearMessage) {
+        PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
+        if (clearMessage
+            .getOp() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+          try {
+            // count down to 1 so that we can go ahead and restart the server
+            latch.countDown();
+            // go ahead until the count is 0 (someone else must call countdown one more time
+            latch.await();
+          } catch (Exception ex) {
+          }
+        }
+      }
+    }
+  }
+
+}
diff --git a/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java b/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java
index 0f77d4d..b928d62 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/concurrent/FileBasedCountDownLatch.java
@@ -73,7 +73,7 @@ public class FileBasedCountDownLatch implements Serializable {
     GeodeAwaitility.await().until(this::currentValue, is(equalTo(0)));
   }
 
-  protected int currentValue() throws IOException {
+  public int currentValue() throws IOException {
     try (FileOutputStream out = new FileOutputStream(lockFile)) {
       java.nio.channels.FileLock lock = out.getChannel().lock();
       try {