You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/29 00:03:51 UTC

[geode] branch feature/GEODE-7665 updated: GEODE-7858: PR.clear notify client should let the queue holder member to notify (#5677)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
     new 4060421  GEODE-7858: PR.clear notify client should let the queue holder member to notify (#5677)
4060421 is described below

commit 406042179bab4e3907315bf2a783ade9534c585d
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Wed Oct 28 17:02:53 2020 -0700

    GEODE-7858: PR.clear notify client should let the queue holder member to notify (#5677)
---
 .../PartitionRegionClearMessageQueueDUnitTest.java | 165 +++++++++++++++++++++
 1 file changed, 165 insertions(+)

diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearMessageQueueDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearMessageQueueDUnitTest.java
new file mode 100644
index 0000000..db7c439
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearMessageQueueDUnitTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ClientCacheRule;
+
+public class PartitionRegionClearMessageQueueDUnitTest {
+  public static final String NAME = "testRegion";
+  protected static MemberVM locator;
+  protected static MemberVM server1, server2, server3, accessor;
+  protected static List<MemberVM> servers;
+
+  @ClassRule
+  public static ClusterStartupRule cluster = new ClusterStartupRule(5);
+
+  @ClassRule
+  public static ClientCacheRule client = new ClientCacheRule();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    int locatorPort = locator.getPort();
+    server1 = cluster.startServerVM(1,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+    server2 = cluster.startServerVM(2,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+    server3 = cluster.startServerVM(3,
+        s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+    accessor = cluster.startServerVM(4, s -> s.withConnectionToLocator(locatorPort)
+        .withRegion(RegionShortcut.PARTITION_PROXY, NAME));
+
+    servers = Arrays.asList(server1, server2, server3, accessor);
+
+    client.withLocatorConnection(locatorPort).withPoolSubscription(true).createCache();
+    client.getCache()
+        .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(NAME);
+  }
+
+  @Test
+  public void clearMessageSentToClientWithRegisteredInterest() throws Exception {
+    Region<Object, Object> testRegion = client.getCache().getRegion(NAME);
+
+    assertThat(testRegion).isEmpty();
+    server1.invoke(() -> addRecord(10));
+    // verify that before register for interest, client region has no data
+    assertThat(testRegion).hasSize(0);
+
+    // verify that after register for interest, client region has data now.
+    testRegion.registerInterestForAllKeys();
+    assertThat(testRegion).hasSize(10);
+
+    // do PR clear on a server that has no client proxy, hence no HARegionQueue
+    Boolean cleared = false;
+    for (int i = 0; i < servers.size(); i++) {
+      cleared = servers.get(i).invoke(
+          PartitionRegionClearMessageQueueDUnitTest::clearRegionOnNoClientProxyMember);
+      if (cleared) {
+        break;
+      }
+    }
+
+    // verify that clear is called on a server
+    assertThat(cleared).isTrue();
+    // verify that the PR clear message is still delivered to the client
+    assertThat(testRegion).hasSize(0);
+  }
+
+  private static void addRecord(int size) {
+    Region<Object, Object> region = getTestRegion();
+    for (int i = 0; i < size; i++) {
+      region.put(i, "value" + i);
+    }
+  }
+
+  @Test
+  public void clearMessageSentToClientWithCQ() throws Exception {
+    QueryService queryService = client.getCache().getDefaultPool().getQueryService();
+    CqAttributesFactory cqaFactory = new CqAttributesFactory();
+    AtomicBoolean clearEventReceived = new AtomicBoolean(false);
+    cqaFactory.addCqListener(new CqListener() {
+      @Override
+      public void onEvent(CqEvent aCqEvent) {
+        Operation baseOperation = aCqEvent.getBaseOperation();
+        if (baseOperation.isClear()) {
+          clearEventReceived.set(true);
+        }
+      }
+
+      @Override
+      public void onError(CqEvent aCqEvent) {}
+    });
+
+    CqQuery cqQuery =
+        queryService.newCq("select * from /" + NAME, cqaFactory.create());
+    cqQuery.execute();
+
+    server1.invoke(() -> addRecord(10));
+
+    Boolean cleared = false;
+    for (int i = 0; i < servers.size(); i++) {
+      cleared = servers.get(i).invoke(
+          PartitionRegionClearMessageQueueDUnitTest::clearRegionOnNoClientProxyMember);
+      if (cleared) {
+        break;
+      }
+    }
+
+    // verify that clear is called on a server
+    assertThat(cleared).isTrue();
+    // verify that the PR clear message is still delivered to the client
+    assertThat(clearEventReceived.get()).isTrue();
+  }
+
+  private static boolean clearRegionOnNoClientProxyMember() {
+    Collection<CacheClientProxy> clientProxies =
+        CacheClientNotifier.getInstance().getClientProxies();
+    if (clientProxies.isEmpty()) {
+      getTestRegion().clear();
+      return true;
+    }
+    return false;
+  }
+
+  private static Region<Object, Object> getTestRegion() {
+    return getCache().getRegion("/" + NAME);
+  }
+}