You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/03/26 17:37:44 UTC
[geode] 19/19: GEODE-1279: Rename Bug36853EventsExpiryDUnitTest as
ClientSubscriptionExpiryDataLossRegressionTest
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3b32fd569b196f7ad1d3e9460f3d5debf67d3ae0
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Mar 23 16:38:33 2018 -0700
GEODE-1279: Rename Bug36853EventsExpiryDUnitTest as ClientSubscriptionExpiryDataLossRegressionTest
* Rewrite test with a spy CacheListener.
---
.../cache/ha/Bug36853EventsExpiryDUnitTest.java | 268 ---------------------
...ntSubscriptionExpiryDataLossRegressionTest.java | 195 +++++++++++++++
2 files changed, 195 insertions(+), 268 deletions(-)
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug36853EventsExpiryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug36853EventsExpiryDUnitTest.java
deleted file mode 100755
index 8dee70b..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug36853EventsExpiryDUnitTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.ha;
-
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionAttributes;
-import org.apache.geode.cache.Scope;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.cache30.CacheTestCase;
-import org.apache.geode.cache30.ClientServerTestCase;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.internal.AvailablePort;
-import org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTest;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.LogWriterUtils;
-import org.apache.geode.test.dunit.NetworkUtils;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-/**
- * This is a bug test for 36853 (Expiry logic in HA is used to expire early data that a secondary
- * picks up that is not in the primary. But it is also possible that it would cause data that is in
- * the primary queue to be expired. And this can cause a data loss. This issue is mostly related to
- * Expiry mechanism and not HA, but it affects HA functionality).
- *
- * This test has a cache-client connected to one cache-server. The expiry-time of events in the
- * queue for the client at the server is set low and dispatcher is set for delayed start. This will
- * make some of the events in the queue expire before dispatcher can start picking them up for
- * delivery to the client.
- */
-@Category({DistributedTest.class, ClientSubscriptionTest.class})
-public class Bug36853EventsExpiryDUnitTest extends JUnit4CacheTestCase {
-
- /** Cache-server */
- private VM server = null;
-
- /** Client , connected to Cache-server */
- private VM client = null;
-
- /** Name of the test region */
- private static final String REGION_NAME =
- Bug36853EventsExpiryDUnitTest.class.getSimpleName() + "_region";
-
- /** The cache instance for test cases */
- private static Cache cache = null;
-
- /** Boolean to indicate the client to proceed for validation */
- private static volatile boolean proceedForValidation = false;
-
- /** Counter to indicate number of puts recieved by client */
- private static volatile int putsRecievedByClient;
-
- /** The last key for operations, to notify for proceeding to validation */
- private static final String LAST_KEY = "LAST_KEY";
-
- /** The time in milliseconds by which the start of dispatcher will be delayed */
- private static final int DISPATCHER_SLOWSTART_TIME = 10000;
-
- /** Number of puts done for the test */
- private static final int TOTAL_PUTS = 5;
-
- @Override
- public final void preSetUp() throws Exception {
- disconnectAllFromDS();
- }
-
- @Override
- public final void postSetUp() throws Exception {
- final Host host = Host.getHost(0);
- server = host.getVM(0);
- client = host.getVM(1);
- server.invoke(() -> ConflationDUnitTest.setIsSlowStart());
- int PORT2 = ((Integer) server.invoke(() -> Bug36853EventsExpiryDUnitTest.createServerCache()))
- .intValue();
-
- client.invoke(() -> Bug36853EventsExpiryDUnitTest
- .createClientCache(NetworkUtils.getServerHostName(host), new Integer(PORT2)));
- }
-
- /**
- * Creates the cache
- *
- * @param props - distributed system props
- * @throws Exception - thrown in any problem occurs in creating cache
- */
- private void createCache(Properties props) throws Exception {
- DistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- assertNotNull(cache);
- }
-
- /**
- * Creates cache and starts the bridge-server
- */
- private static Integer createServerCache() throws Exception {
- System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, "1");
- System.setProperty("slowStartTimeForTesting", String.valueOf(DISPATCHER_SLOWSTART_TIME));
- new Bug36853EventsExpiryDUnitTest().createCache(new Properties());
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- RegionAttributes attrs = factory.create();
- cache.createRegion(REGION_NAME, attrs);
-
- CacheServer server = cache.addCacheServer();
- assertNotNull(server);
- int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- server.setPort(port);
- server.setNotifyBySubscription(true);
- server.start();
- return new Integer(server.getPort());
- }
-
- /**
- * Creates the client cache
- *
- * @param hostName the name of the server's machine
- * @param port - bridgeserver port
- * @throws Exception - thrown if any problem occurs in setting up the client
- */
- private static void createClientCache(String hostName, Integer port) throws Exception {
- Properties props = new Properties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "");
- new Bug36853EventsExpiryDUnitTest().createCache(props);
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- ClientServerTestCase.configureConnectionPool(factory, hostName, port.intValue(), -1, true, -1,
- 2, null);
-
- factory.addCacheListener(new CacheListenerAdapter() {
- public void afterCreate(EntryEvent event) {
- String key = (String) event.getKey();
- LogWriterUtils.getLogWriter().info("client2 : afterCreate : key =" + key);
- if (key.equals(LAST_KEY)) {
-
- synchronized (Bug36853EventsExpiryDUnitTest.class) {
- LogWriterUtils.getLogWriter().info("Notifying client2 to proceed for validation");
- proceedForValidation = true;
- Bug36853EventsExpiryDUnitTest.class.notify();
- }
- } else {
- putsRecievedByClient++;
- }
- }
- });
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(REGION_NAME, attrs);
-
- region.registerInterest("ALL_KEYS");
- }
-
- /**
- * First generates some events, then waits for the time equal to that of delayed start of the
- * dispatcher and then does put on the last key for few iterations. The idea is to let the events
- * added, before waiting, to expire before the dispatcher to pick them up and then do a put on a
- * LAST_KEY couple of times so that atleast one of these is dispatched to client and when client
- * recieves this in the listener, the test is notified to proceed for validation.
- *
- * @throws Exception - thrown if any problem occurs in put operation
- */
- private static void generateEvents() throws Exception {
- String regionName = Region.SEPARATOR + REGION_NAME;
- Region region = cache.getRegion(regionName);
- for (int i = 0; i < TOTAL_PUTS; i++) {
-
- region.put("key" + i, "val-" + i);
- }
- Thread.sleep(DISPATCHER_SLOWSTART_TIME + 1000);
- for (int i = 0; i < 25; i++) {
-
- region.put(LAST_KEY, "LAST_VALUE");
- }
- }
-
- /**
- * First generates some events, then waits for the time equal to that of delayed start of the
- * dispatcher and then does put on the last key for few iterations. Whenever the client the create
- * corresponding to the LAST_KEY in the listener, the test is notified to proceed for validation.
- * Then, it is validated that all the events that were added prior to the LAST_KEY are dispatched
- * to the client. Due to the bug#36853, those events will expire and validation will fail.
- *
- * @throws Exception - thrown if any exception occurs in test
- */
- @Test
- public void testEventsExpiryBug() throws Exception {
- IgnoredException.addIgnoredException("Unexpected IOException");
- IgnoredException.addIgnoredException("Connection reset");
- server.invoke(() -> Bug36853EventsExpiryDUnitTest.generateEvents());
- client.invoke(() -> Bug36853EventsExpiryDUnitTest.validateEventCountAtClient());
- }
-
- /**
- * Waits for the listener to receive all events and validates that no exception occurred in client
- */
- private static void validateEventCountAtClient() throws Exception {
- if (!proceedForValidation) {
- synchronized (Bug36853EventsExpiryDUnitTest.class) {
- if (!proceedForValidation)
- try {
- LogWriterUtils.getLogWriter().info("Client2 going in wait before starting validation");
- Bug36853EventsExpiryDUnitTest.class.wait(5000);
- } catch (InterruptedException e) {
- fail("interrupted");
- }
- }
- }
- LogWriterUtils.getLogWriter().info("Starting validation on client2");
- assertEquals("Puts recieved by client not equal to the puts done at server.", TOTAL_PUTS,
- putsRecievedByClient);
- LogWriterUtils.getLogWriter().info("putsRecievedByClient = " + putsRecievedByClient);
- LogWriterUtils.getLogWriter().info("Validation complete on client2");
-
- }
-
- /**
- * Closes the cache
- *
- */
- private static void unSetExpiryTimeAndCloseCache() {
- System.clearProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME);
- CacheTestCase.closeCache();
- }
-
- /**
- * Closes the caches on clients and servers
- *
- * @throws Exception - thrown if any problem occurs in closing client and server caches.
- */
- @Override
- public final void preTearDownCacheTestCase() throws Exception {
- // close client
- client.invoke(() -> Bug36853EventsExpiryDUnitTest.unSetExpiryTimeAndCloseCache());
- // close server
- server.invoke(() -> Bug36853EventsExpiryDUnitTest.unSetExpiryTimeAndCloseCache());
-
- }
-
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java
new file mode 100755
index 0000000..668ea8b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache30.ClientServerTestCase.configureConnectionPool;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTest.setIsSlowStart;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * This is a bug test for 36853 (Expiry logic in HA is used to expire early data that a secondary
+ * picks up that is not in the primary. But it is also possible that it would cause data that is in
+ * the primary queue to be expired. And this can cause a data loss. This issue is mostly related to
+ * Expiry mechanism and not HA, but it affects HA functionality).
+ *
+ * <p>
+ * This test has a cache-client connected to one cache-server. The expiry-time of events in the
+ * queue for the client at the server is set low and dispatcher is set for delayed start. This will
+ * make some of the events in the queue expire before dispatcher can start picking them up for
+ * delivery to the client.
+ *
+ * <p>
+ * TRAC #36853: HA events can expire on primary server and this can cause data loss.
+ */
+@Category({DistributedTest.class, ClientSubscriptionTest.class})
+public class ClientSubscriptionExpiryDataLossRegressionTest extends CacheTestCase {
+
+ /** The time in milliseconds by which the start of dispatcher will be delayed */
+ private static final int DISPATCHER_SLOWSTART_TIME = 10_000;
+ private static final int PUT_COUNT = 5;
+
+ private static CacheListener<String, String> spyCacheListener;
+
+ private String uniqueName;
+ private String hostName;
+ private int serverPort;
+
+ private VM server;
+ private VM client;
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setUp() throws Exception {
+ server = getHost(0).getVM(0);
+ client = getHost(0).getVM(1);
+
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ hostName = getServerHostName(getHost(0));
+
+ server.invoke(() -> setIsSlowStart());
+ serverPort = server.invoke(() -> createServerCache());
+
+ client.invoke(() -> createClientCache());
+
+ addIgnoredException("Unexpected IOException");
+ addIgnoredException("Connection reset");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ /**
+ * First generate some events, then wait for some time to let the initial events expire before
+ * the dispatcher sends them to the client. Then do one final put so that the client knows when
+ * to being validation.
+ *
+ * <p>
+ * Client is waiting for afterCreate to be invoked number of PUT_COUNT times before proceeding
+ * with validation.
+ *
+ * <p>
+ * If the bug exists or is reintroduced, then the events will expire without reaching the client.
+ */
+ @Test
+ public void allEventsShouldReachClientWithoutExpiring() throws Exception {
+ server.invoke(() -> generateEvents());
+ client.invoke(() -> validateEventCountAtClient());
+ }
+
+ private int createServerCache() throws IOException {
+ System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, String.valueOf(1));
+ System.setProperty("slowStartTimeForTesting", String.valueOf(DISPATCHER_SLOWSTART_TIME));
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+
+ getCache().createRegion(uniqueName, factory.create());
+
+ CacheServer server = getCache().addCacheServer();
+ server.setPort(0);
+ server.setNotifyBySubscription(true);
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientCache() {
+ Properties config = new Properties();
+ config.setProperty(MCAST_PORT, "0");
+ config.setProperty(LOCATORS, "");
+
+ getCache(config);
+
+ spyCacheListener = spy(CacheListener.class);
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.addCacheListener(spyCacheListener);
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+
+ configureConnectionPool(factory, hostName, serverPort, -1, true, -1, 2, null);
+
+ Region region = getCache().createRegion(uniqueName, factory.create());
+
+ region.registerInterest("ALL_KEYS");
+ }
+
+ /**
+ * First generate some events, then wait for some time to let the initial events expire before
+ * the dispatcher sends them to the client. Then do one final put so that the client knows when
+ * to being validation.
+ *
+ * <p>
+ * Client is waiting for afterCreate to be invoked number of PUT_COUNT times before proceeding
+ * with validation.
+ */
+ private void generateEvents() throws InterruptedException {
+ Region<String, String> region = getCache().getRegion(uniqueName);
+ for (int i = 0; i < PUT_COUNT - 1; i++) {
+ region.put("key" + i, "val-" + i);
+ }
+
+ Thread.sleep(DISPATCHER_SLOWSTART_TIME + 1000);
+
+ region.put("key" + PUT_COUNT, "LAST_VALUE");
+ }
+
+ /**
+ * Waits for the listener to receive all events
+ */
+ private void validateEventCountAtClient() {
+ await().atMost(1, MINUTES)
+ .until(() -> verify(spyCacheListener, times(PUT_COUNT)).afterCreate(any()));
+ }
+}
--
To stop receiving notification emails like this one, please contact
klund@apache.org.