You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/04/23 05:46:39 UTC
[geode] 01/13: GEODE-1279: Rename HARegionQueueSizeRegressionTest
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit af5d6ee7e4264df5e2ff5eeb6c33ca0bb75582ed
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Wed Apr 4 17:47:32 2018 -0700
GEODE-1279: Rename HARegionQueueSizeRegressionTest
* Bug48571DUnitTest -> HARegionQueueSizeRegressionTest
* Overhaul test to use Rules, Awaitility and CacheListener Spy
---
.../geode/internal/cache/ha/Bug48571DUnitTest.java | 290 ---------------------
.../cache/ha/HARegionQueueSizeRegressionTest.java | 245 +++++++++++++++++
.../dunit/cache/internal/JUnit4CacheTestCase.java | 4 +
3 files changed, 249 insertions(+), 290 deletions(-)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug48571DUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug48571DUnitTest.java
deleted file mode 100644
index 19c5a8b..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug48571DUnitTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.ha;
-
-import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
-import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
-import static org.junit.Assert.assertEquals;
-
-import java.util.Collection;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.internal.OSProcess;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.test.dunit.DistributedTestUtils;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-@Category({DistributedTest.class, ClientSubscriptionTest.class})
-public class Bug48571DUnitTest extends JUnit4DistributedTestCase {
-
- private static VM server = null;
- private VM client = null;
- private static GemFireCacheImpl cache = null;
-
- private static final String region = Bug48571DUnitTest.class.getSimpleName() + "_region";
- private static int numOfCreates = 0;
- private static int numOfUpdates = 0;
- private static int numOfInvalidates = 0;
- private static boolean lastKeyReceived = false;
-
- @Override
- public final void postSetUp() throws Exception {
- final Host host = Host.getHost(0);
- server = host.getVM(0);
- client = host.getVM(1);
- }
-
- @Override
- public final void preTearDown() throws Exception {
- reset();
- server.invoke(Bug48571DUnitTest::reset);
- client.invoke(Bug48571DUnitTest::reset);
- }
-
- private static void reset() {
- lastKeyReceived = false;
- numOfCreates = 0;
- numOfUpdates = 0;
- numOfInvalidates = 0;
- if (cache != null && !cache.isClosed()) {
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- }
-
- @Test
- public void testStatsMatchWithSize() throws Exception {
- IgnoredException.addIgnoredException("Unexpected IOException||Connection reset");
- // start a server
- int port = server.invoke(Bug48571DUnitTest::createServerCache);
- // create durable client, with durable RI
- client.invoke(() -> Bug48571DUnitTest.createClientCache(client.getHost(), port));
- // do puts on server from three different threads, pause after 500 puts each.
- server.invoke(Bug48571DUnitTest::doPuts);
- // close durable client
- client.invoke(Bug48571DUnitTest::closeClientCache);
-
- server.invoke("verifyProxyHasBeenPaused", Bug48571DUnitTest::verifyProxyHasBeenPaused);
- // resume puts on server, add another 100.
- server.invoke(Bug48571DUnitTest::resumePuts);
- // start durable client
- client.invoke(() -> Bug48571DUnitTest.createClientCache(client.getHost(), port));
- // wait for full queue dispatch
- client.invoke(Bug48571DUnitTest::waitForLastKey);
- // verify the stats
- server.invoke(Bug48571DUnitTest::verifyStats);
- }
-
- private static void verifyProxyHasBeenPaused() {
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- Collection<CacheClientProxy> ccProxies = ccn.getClientProxies();
- boolean pausedFlag = false;
-
- for (CacheClientProxy ccp : ccProxies) {
- System.out.println("proxy status " + ccp.getState());
- if (ccp.isPaused()) {
- pausedFlag = true;
- break;
- }
- }
- assertEquals("Proxy has not been paused in 1 minute", true, pausedFlag);
- });
- }
-
- private static int createServerCache() throws Exception {
- Properties props = new Properties();
- props.setProperty(LOCATORS, "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");
- props.setProperty(LOG_FILE, "server_" + OSProcess.getId() + ".log");
- props.setProperty(LOG_LEVEL, "info");
- props.setProperty(STATISTIC_ARCHIVE_FILE, "server_" + OSProcess.getId() + ".gfs");
- props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
- CacheFactory cf = new CacheFactory(props);
-
- DistributedSystem ds = new Bug48571DUnitTest().getSystem(props);
- ds.disconnect();
-
- cache = (GemFireCacheImpl) cf.create();
-
- RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
- rf.setConcurrencyChecksEnabled(false);
- rf.create(region);
-
- CacheServer server1 = cache.addCacheServer();
- server1.setPort(0);
- server1.start();
- return server1.getPort();
- }
-
- private static void closeClientCache() {
- cache.close(true);
- }
-
- private static void createClientCache(Host host, Integer port) {
-
- Properties props = new Properties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "");
- props.setProperty(DURABLE_CLIENT_ID, "durable-48571");
- props.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
-
- props.setProperty(LOG_FILE, "client_" + OSProcess.getId() + ".log");
- props.setProperty(LOG_LEVEL, "info");
- props.setProperty(STATISTIC_ARCHIVE_FILE, "client_" + OSProcess.getId() + ".gfs");
- props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
-
- ClientCacheFactory ccf = new ClientCacheFactory(props);
- ccf.setPoolSubscriptionEnabled(true);
- ccf.setPoolSubscriptionAckInterval(50);
- ccf.setPoolSubscriptionRedundancy(0);
- ccf.addPoolServer(host.getHostName(), port);
-
- DistributedSystem ds = new Bug48571DUnitTest().getSystem(props);
- ds.disconnect();
-
- cache = (GemFireCacheImpl) ccf.create();
-
- ClientRegionFactory<String, String> crf =
- cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
- crf.setConcurrencyChecksEnabled(false);
-
- crf.addCacheListener(new CacheListenerAdapter<String, String>() {
- public void afterInvalidate(EntryEvent<String, String> event) {
- cache.getLoggerI18n()
- .fine("Invalidate Event: " + event.getKey() + ", " + event.getNewValue());
- numOfInvalidates++;
- }
-
- public void afterCreate(EntryEvent<String, String> event) {
- if (event.getKey().equals("last_key")) {
- lastKeyReceived = true;
- }
- cache.getLoggerI18n().fine("Create Event: " + event.getKey() + ", " + event.getNewValue());
- numOfCreates++;
- }
-
- public void afterUpdate(EntryEvent<String, String> event) {
- cache.getLoggerI18n().fine("Update Event: " + event.getKey() + ", " + event.getNewValue());
- numOfUpdates++;
- }
- });
-
- Region<String, String> r = crf.create(region);
- r.registerInterest("ALL_KEYS", true);
- cache.readyForEvents();
- }
-
- private static void doPuts() throws Exception {
- final Region<String, String> r = cache.getRegion(region);
- Thread t1 = new Thread(() -> {
- for (int i = 0; i < 500; i++) {
- r.put("T1_KEY_" + i, "VALUE_" + i);
- }
- });
- Thread t2 = new Thread(() -> {
- for (int i = 0; i < 500; i++) {
- r.put("T2_KEY_" + i, "VALUE_" + i);
- }
- });
- Thread t3 = new Thread(() -> {
- for (int i = 0; i < 500; i++) {
- r.put("T3_KEY_" + i, "VALUE_" + i);
- }
- });
-
- t1.start();
- t2.start();
- t3.start();
-
- t1.join();
- t2.join();
- t3.join();
- }
-
- private static void resumePuts() {
- Region<String, String> r = cache.getRegion(region);
- for (int i = 0; i < 100; i++) {
- r.put("NEWKEY_" + i, "NEWVALUE_" + i);
- }
- r.put("last_key", "last_value");
- }
-
- private static void waitForLastKey() {
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return lastKeyReceived;
- }
-
- @Override
- public String description() {
- return "Did not receive last key.";
- }
- };
- Wait.waitForCriterion(wc, 60 * 1000, 500, true);
- }
-
- private static void verifyStats() {
- Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
- CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- CacheClientProxy ccp = ccn.getClientProxies().iterator().next();
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getQueueSize() " + ccp.getQueueSize());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getQueueSizeStat() " + ccp.getQueueSizeStat());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getEventsEnqued() " + ccp.getHARegionQueue().getStatistics().getEventsEnqued());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getEventsDispatched() " + ccp.getHARegionQueue().getStatistics().getEventsDispatched());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getEventsRemoved() " + ccp.getHARegionQueue().getStatistics().getEventsRemoved());
- cache.getLoggerI18n().info(LocalizedStrings.DEBUG,
- "getNumVoidRemovals() " + ccp.getHARegionQueue().getStatistics().getNumVoidRemovals());
- assertEquals("The queue size did not match the stat value", ccp.getQueueSize(),
- ccp.getQueueSizeStat());
- });
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueSizeRegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueSizeRegressionTest.java
new file mode 100644
index 0000000..f86c1f7
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueSizeRegressionTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.client.ClientRegionShortcut.CACHING_PROXY;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Client queue size statistic should not go negative when client goes offline.
+ *
+ * <p>
+ * TRAC #48571: CacheClientProxy.getQueueSizeStat() gives negative numbers when client goes down.
+ */
+@Category({DistributedTest.class, ClientSubscriptionTest.class})
+public class HARegionQueueSizeRegressionTest implements Serializable {
+
+ private static final AtomicInteger numOfPuts = new AtomicInteger();
+
+ private static volatile CacheListener<String, String> spyCacheListener;
+
+ private String regionName;
+ private String hostName;
+
+ private VM server;
+ private VM client;
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() throws Exception {
+ server = getVM(0);
+ client = getVM(1);
+
+ hostName = getHostName();
+ regionName = getClass().getSimpleName() + "_" + testName.getMethodName();
+
+ addIgnoredException("Unexpected IOException||Connection reset");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ invokeInEveryVM(() -> {
+ spyCacheListener = null;
+ });
+ }
+
+ @Test
+ public void testStatsMatchWithSize() throws Exception {
+ // start a server
+ int port = server.invoke(() -> createServerCache());
+
+ // create durable client, with durable RI
+ client.invoke(() -> createClientCache(hostName, port));
+
+ // do puts on server
+ server.invoke(() -> doPuts(10));
+
+ client.invoke(() -> awaitCreates(10));
+
+ // close durable client
+ client.invoke(() -> closeClientCacheWithKeepAlive());
+
+ server.invoke(() -> awaitProxyIsPaused());
+
+ // resume puts on server
+ server.invoke(() -> resumePuts(10));
+
+ // start durable client
+ client.invoke(() -> createClientCache(hostName, port));
+
+ // wait for full queue dispatch
+ client.invoke(() -> awaitCreates(10));
+
+ // verify the stats
+ server.invoke(() -> verifyStats());
+ }
+
+ private int createServerCache() throws IOException {
+ cacheRule.createCache();
+
+ RegionFactory<String, String> rf = cacheRule.getCache().createRegionFactory(REPLICATE);
+ rf.setConcurrencyChecksEnabled(false);
+ rf.create(regionName);
+
+ CacheServer server1 = cacheRule.getCache().addCacheServer();
+ server1.setPort(0);
+ server1.start();
+ return server1.getPort();
+ }
+
+ private void createClientCache(String hostName, Integer port) {
+ Properties config = new Properties();
+ config.setProperty(MCAST_PORT, "0");
+ config.setProperty(LOCATORS, "");
+ config.setProperty(DURABLE_CLIENT_ID, "durable-48571");
+ config.setProperty(DURABLE_CLIENT_TIMEOUT, "300000");
+
+ ClientCacheFactory ccf = new ClientCacheFactory(config);
+ ccf.setPoolSubscriptionEnabled(true);
+ ccf.setPoolSubscriptionAckInterval(50);
+ ccf.setPoolSubscriptionRedundancy(0);
+ ccf.addPoolServer(hostName, port);
+
+ clientCacheRule.createClientCache(ccf);
+
+ ClientRegionFactory<String, String> crf =
+ clientCacheRule.getClientCache().createClientRegionFactory(CACHING_PROXY);
+ crf.setConcurrencyChecksEnabled(false);
+
+ spyCacheListener = spy(CacheListener.class);
+ crf.addCacheListener(spyCacheListener);
+
+ Region<String, String> region = crf.create(regionName);
+ region.registerInterest("ALL_KEYS", true);
+
+ clientCacheRule.getClientCache().readyForEvents();
+ }
+
+ private void closeClientCacheWithKeepAlive() {
+ clientCacheRule.getClientCache().close(true);
+ }
+
+ private void doPuts(int creates) {
+ Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+
+ for (int i = 1; i <= creates; i++) {
+ put(region, "KEY-" + i, "VALUE-" + i);
+ }
+ }
+
+ private void awaitProxyIsPaused() {
+ Awaitility.await().atMost(60, SECONDS).until(() -> {
+ CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+ Collection<CacheClientProxy> ccProxies = ccn.getClientProxies();
+
+ boolean pausedFlag = false;
+ for (CacheClientProxy ccp : ccProxies) {
+ if (ccp.isPaused()) {
+ pausedFlag = true;
+ break;
+ }
+ }
+ assertThat(pausedFlag).as("Proxy has not been paused in 1 minute").isTrue();
+ });
+ }
+
+ private void resumePuts(int creates) {
+ Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+ for (int i = 1; i <= creates; i++) {
+ put(region, "NEWKEY-" + i, "VALUE_" + i);
+ }
+ }
+
+ private void put(Region<String, String> region, String key, String value) {
+ region.put(key, value);
+ numOfPuts.incrementAndGet();
+ }
+
+ private void awaitCreates(int expectedCreates) {
+ Awaitility.await().atMost(60, SECONDS).until(() -> {
+ verify(spyCacheListener, times(expectedCreates)).afterCreate(any());
+ });
+ }
+
+ private void verifyStats() {
+ Awaitility.await().atMost(60, SECONDS).until(() -> {
+ CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+ CacheClientProxy ccp = ccn.getClientProxies().iterator().next();
+ // TODO: consider verifying ccp.getQueueSize()
+ // TODO: consider verifying ccp.getQueueSizeStat()
+ // TODO: consider verifying ccp.getHARegionQueue().getStatistics().getEventsEnqued()
+ // TODO: consider verifying ccp.getHARegionQueue().getStatistics().getEventsDispatched()
+ // TODO: consider verifying ccp.getHARegionQueue().getStatistics().getEventsRemoved()
+ // TODO: consider verifying ccp.getHARegionQueue().getStatistics().getNumVoidRemovals()
+ assertThat(ccp.getQueueSizeStat()).as("The queue size did not match the stat value")
+ .isEqualTo(ccp.getQueueSize());
+ });
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
index ac25583..c93f921 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
@@ -262,6 +262,10 @@ public abstract class JUnit4CacheTestCase extends JUnit4DistributedTestCase
}
}
+ public final ClientCache getClientCache() {
+ return (ClientCache) cache;
+ }
+
/**
* Invokes {@link #getCache()} and casts the return to {@code GemFireCacheImpl}.
*
--
To stop receiving notification emails like this one, please contact
klund@apache.org.