You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/29 00:03:51 UTC
[geode] branch feature/GEODE-7665 updated: GEODE-7858: PR.clear
notify client should let the queue holder member to notify (#5677)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
new 4060421 GEODE-7858: PR.clear notify client should let the queue holder member to notify (#5677)
4060421 is described below
commit 406042179bab4e3907315bf2a783ade9534c585d
Author: Jinmei Liao <ji...@pivotal.io>
AuthorDate: Wed Oct 28 17:02:53 2020 -0700
GEODE-7858: PR.clear notify client should let the queue holder member to notify (#5677)
---
.../PartitionRegionClearMessageQueueDUnitTest.java | 165 +++++++++++++++++++++
1 file changed, 165 insertions(+)
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearMessageQueueDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearMessageQueueDUnitTest.java
new file mode 100644
index 0000000..db7c439
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearMessageQueueDUnitTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ClientCacheRule;
+
+public class PartitionRegionClearMessageQueueDUnitTest {
+ public static final String NAME = "testRegion";
+ protected static MemberVM locator;
+ protected static MemberVM server1, server2, server3, accessor;
+ protected static List<MemberVM> servers;
+
+ @ClassRule
+ public static ClusterStartupRule cluster = new ClusterStartupRule(5);
+
+ @ClassRule
+ public static ClientCacheRule client = new ClientCacheRule();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ locator = cluster.startLocatorVM(0);
+ int locatorPort = locator.getPort();
+ server1 = cluster.startServerVM(1,
+ s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+ server2 = cluster.startServerVM(2,
+ s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+ server3 = cluster.startServerVM(3,
+ s -> s.withConnectionToLocator(locatorPort).withRegion(RegionShortcut.PARTITION, NAME));
+ accessor = cluster.startServerVM(4, s -> s.withConnectionToLocator(locatorPort)
+ .withRegion(RegionShortcut.PARTITION_PROXY, NAME));
+
+ servers = Arrays.asList(server1, server2, server3, accessor);
+
+ client.withLocatorConnection(locatorPort).withPoolSubscription(true).createCache();
+ client.getCache()
+ .createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create(NAME);
+ }
+
+ @Test
+ public void clearMessageSentToClientWithRegisteredInterest() throws Exception {
+ Region<Object, Object> testRegion = client.getCache().getRegion(NAME);
+
+ assertThat(testRegion).isEmpty();
+ server1.invoke(() -> addRecord(10));
+ // verify that before register for interest, client region has no data
+ assertThat(testRegion).hasSize(0);
+
+ // verify that after register for interest, client region has data now.
+ testRegion.registerInterestForAllKeys();
+ assertThat(testRegion).hasSize(10);
+
+ // do PR clear on a server that has no client proxy, hence no HARegionQueue
+ Boolean cleared = false;
+ for (int i = 0; i < servers.size(); i++) {
+ cleared = servers.get(i).invoke(
+ PartitionRegionClearMessageQueueDUnitTest::clearRegionOnNoClientProxyMember);
+ if (cleared) {
+ break;
+ }
+ }
+
+ // verify that clear is called on a server
+ assertThat(cleared).isTrue();
+ // verify that the PR clear message is still delivered to the client
+ assertThat(testRegion).hasSize(0);
+ }
+
+ private static void addRecord(int size) {
+ Region<Object, Object> region = getTestRegion();
+ for (int i = 0; i < size; i++) {
+ region.put(i, "value" + i);
+ }
+ }
+
+ @Test
+ public void clearMessageSentToClientWithCQ() throws Exception {
+ QueryService queryService = client.getCache().getDefaultPool().getQueryService();
+ CqAttributesFactory cqaFactory = new CqAttributesFactory();
+ AtomicBoolean clearEventReceived = new AtomicBoolean(false);
+ cqaFactory.addCqListener(new CqListener() {
+ @Override
+ public void onEvent(CqEvent aCqEvent) {
+ Operation baseOperation = aCqEvent.getBaseOperation();
+ if (baseOperation.isClear()) {
+ clearEventReceived.set(true);
+ }
+ }
+
+ @Override
+ public void onError(CqEvent aCqEvent) {}
+ });
+
+ CqQuery cqQuery =
+ queryService.newCq("select * from /" + NAME, cqaFactory.create());
+ cqQuery.execute();
+
+ server1.invoke(() -> addRecord(10));
+
+ Boolean cleared = false;
+ for (int i = 0; i < servers.size(); i++) {
+ cleared = servers.get(i).invoke(
+ PartitionRegionClearMessageQueueDUnitTest::clearRegionOnNoClientProxyMember);
+ if (cleared) {
+ break;
+ }
+ }
+
+ // verify that clear is called on a server
+ assertThat(cleared).isTrue();
+ // verify that the PR clear message is still delivered to the client
+ assertThat(clearEventReceived.get()).isTrue();
+ }
+
+ private static boolean clearRegionOnNoClientProxyMember() {
+ Collection<CacheClientProxy> clientProxies =
+ CacheClientNotifier.getInstance().getClientProxies();
+ if (clientProxies.isEmpty()) {
+ getTestRegion().clear();
+ return true;
+ }
+ return false;
+ }
+
+ private static Region<Object, Object> getTestRegion() {
+ return getCache().getRegion("/" + NAME);
+ }
+}