You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/06/22 00:30:39 UTC
[geode] branch feature/GEODE-6900 updated: GEODE-6900: Add a unit
test with detect read conflicts
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-6900
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6900 by this push:
new 21078b4 GEODE-6900: Add a unit test with detect read conflicts
21078b4 is described below
commit 21078b4d184c892a5d3ff70f5b13ba355ee4aa67
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Jun 21 17:29:14 2019 -0700
GEODE-6900: Add a unit test with detect read conflicts
---
.../cache/TXDetectReadConflictJUnitTest.java | 108 ++++++++++++++++++---
1 file changed, 95 insertions(+), 13 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java
index 810f148..7d671cb 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java
@@ -16,9 +16,13 @@ package org.apache.geode.internal.cache;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
@@ -29,6 +33,7 @@ import org.junit.rules.TestName;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -45,10 +50,17 @@ public class TXDetectReadConflictJUnitTest {
@Rule
public TestName name = new TestName();
- protected Cache cache = null;
- protected Region region = null;
- protected Region regionpr = null;
-
+ private Cache cache = null;
+ private Region region = null;
+ private Region regionPR = null;
+ private CountDownLatch allowWriteTransactionToCommitLatch = new CountDownLatch(1);
+ private CountDownLatch allowReadTransactionToProceedLatch = new CountDownLatch(1);
+ private final String key = "key";
+ private final String key1 = "key1";
+ private final String value = "value";
+ private final String value1 = "value";
+ private final String newValue = "newValue";
+ private final String newValue1 = "newValue1";
@Before
public void setUp() throws Exception {
@@ -69,7 +81,7 @@ public class TXDetectReadConflictJUnitTest {
props.put(MCAST_PORT, "0");
props.put(LOCATORS, "");
cache = new CacheFactory(props).create();
- regionpr = cache.createRegionFactory(RegionShortcut.PARTITION).create("testRegionPR");
+ regionPR = cache.createRegionFactory(RegionShortcut.PARTITION).create("testRegionPR");
}
@After
@@ -81,12 +93,12 @@ public class TXDetectReadConflictJUnitTest {
public void testReadConflictsRR() throws Exception {
cache.close();
createCache();
- region.put("key", "value");
- region.put("key1", "value1");
+ region.put(key, value);
+ region.put(key1, value1);
TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager();
mgr.begin();
- assertEquals("value", region.get("key"));
- assertEquals("value1", region.get("key1"));
+ assertEquals(value, region.get(key));
+ assertEquals(value1, region.get(key1));
mgr.commit();
}
@@ -94,12 +106,82 @@ public class TXDetectReadConflictJUnitTest {
public void testReadConflictsPR() throws Exception {
cache.close();
createCachePR();
- regionpr.put("key", "value");
- regionpr.put("key1", "value1");
+ regionPR.put(key, value);
+ regionPR.put(key1, value1);
TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager();
mgr.begin();
- assertEquals("value", regionpr.get("key"));
- assertEquals("value1", regionpr.get("key1"));
+ assertEquals(value, regionPR.get(key));
+ assertEquals(value1, regionPR.get(key1));
mgr.commit();
}
+
+ @Test
+ public void readConflictsTransactionCanBlockWriteTransaction() {
+ cache.close();
+ createCache();
+
+ region.put(key, value);
+ region.put(key1, value1);
+ TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ txManager.begin();
+ assertThat(region.get(key)).isSameAs(value);
+ region.put(key1, newValue1);
+ TXState txState =
+ (TXState) ((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).getRealDeal(null, null);
+ txState.setAfterReservation(() -> readTransactionAfterReservation());
+ Runnable task = () -> doPutOnReadKeyTransaction();
+ new Thread(task).start();
+ txManager.commit();
+ assertThat(region.get(key)).isSameAs(value);
+ assertThat(region.get(key1)).isSameAs(newValue1);
+ }
+
+ private void readTransactionAfterReservation() {
+ allowWriteTransactionToCommitLatch.countDown();
+ try {
+ allowReadTransactionToProceedLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void doPutOnReadKeyTransaction() {
+ TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ txManager.begin();
+ region.put(key, newValue); // expect commit conflict
+ try {
+ allowWriteTransactionToCommitLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertThatThrownBy(() -> txManager.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ allowReadTransactionToProceedLatch.countDown();
+ }
+
+ @Test
+ public void readConflictsTransactionCanDetectStateChange() throws Exception {
+ cache.close();
+ createCache();
+
+ region.put(key, value);
+ region.put(key1, value1);
+ TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ txManager.begin();
+ assertThat(region.get(key)).isSameAs(value);
+ region.put(key1, newValue1);
+ Runnable task = () -> doPutTransaction();
+ new Thread(task).start();
+ allowReadTransactionToProceedLatch.await();
+ assertThatThrownBy(() -> txManager.commit()).isExactlyInstanceOf(CommitConflictException.class);
+ assertThat(region.get(key)).isSameAs(newValue);
+ assertThat(region.get(key1)).isSameAs(value1);
+ }
+
+ private void doPutTransaction() {
+ TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ txManager.begin();
+ region.put(key, newValue); // expect commit conflict
+ txManager.commit();
+ allowReadTransactionToProceedLatch.countDown();
+ }
}