You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/06/22 00:30:39 UTC

[geode] branch feature/GEODE-6900 updated: GEODE-6900: Add a unit test with detect read conflicts

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6900
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6900 by this push:
     new 21078b4  GEODE-6900: Add a unit test with detect read conflicts
21078b4 is described below

commit 21078b4d184c892a5d3ff70f5b13ba355ee4aa67
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Jun 21 17:29:14 2019 -0700

    GEODE-6900: Add a unit test with detect read conflicts
---
 .../cache/TXDetectReadConflictJUnitTest.java       | 108 ++++++++++++++++++---
 1 file changed, 95 insertions(+), 13 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java
index 810f148..7d671cb 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java
@@ -16,9 +16,13 @@ package org.apache.geode.internal.cache;
 
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.After;
 import org.junit.Before;
@@ -29,6 +33,7 @@ import org.junit.rules.TestName;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -45,10 +50,17 @@ public class TXDetectReadConflictJUnitTest {
   @Rule
   public TestName name = new TestName();
 
-  protected Cache cache = null;
-  protected Region region = null;
-  protected Region regionpr = null;
-
+  private Cache cache = null;
+  private Region region = null;
+  private Region regionPR = null;
+  private CountDownLatch allowWriteTransactionToCommitLatch = new CountDownLatch(1);
+  private CountDownLatch allowReadTransactionToProceedLatch = new CountDownLatch(1);
+  private final String key = "key";
+  private final String key1 = "key1";
+  private final String value = "value";
+  private final String value1 = "value";
+  private final String newValue = "newValue";
+  private final String newValue1 = "newValue1";
 
   @Before
   public void setUp() throws Exception {
@@ -69,7 +81,7 @@ public class TXDetectReadConflictJUnitTest {
     props.put(MCAST_PORT, "0");
     props.put(LOCATORS, "");
     cache = new CacheFactory(props).create();
-    regionpr = cache.createRegionFactory(RegionShortcut.PARTITION).create("testRegionPR");
+    regionPR = cache.createRegionFactory(RegionShortcut.PARTITION).create("testRegionPR");
   }
 
   @After
@@ -81,12 +93,12 @@ public class TXDetectReadConflictJUnitTest {
   public void testReadConflictsRR() throws Exception {
     cache.close();
     createCache();
-    region.put("key", "value");
-    region.put("key1", "value1");
+    region.put(key, value);
+    region.put(key1, value1);
     TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager();
     mgr.begin();
-    assertEquals("value", region.get("key"));
-    assertEquals("value1", region.get("key1"));
+    assertEquals(value, region.get(key));
+    assertEquals(value1, region.get(key1));
     mgr.commit();
   }
 
@@ -94,12 +106,82 @@ public class TXDetectReadConflictJUnitTest {
   public void testReadConflictsPR() throws Exception {
     cache.close();
     createCachePR();
-    regionpr.put("key", "value");
-    regionpr.put("key1", "value1");
+    regionPR.put(key, value);
+    regionPR.put(key1, value1);
     TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager();
     mgr.begin();
-    assertEquals("value", regionpr.get("key"));
-    assertEquals("value1", regionpr.get("key1"));
+    assertEquals(value, regionPR.get(key));
+    assertEquals(value1, regionPR.get(key1));
     mgr.commit();
   }
+
+  @Test
+  public void readConflictsTransactionCanBlockWriteTransaction() {
+    cache.close();
+    createCache();
+
+    region.put(key, value);
+    region.put(key1, value1);
+    TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    txManager.begin();
+    assertThat(region.get(key)).isSameAs(value);
+    region.put(key1, newValue1);
+    TXState txState =
+        (TXState) ((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).getRealDeal(null, null);
+    txState.setAfterReservation(() -> readTransactionAfterReservation());
+    Runnable task = () -> doPutOnReadKeyTransaction();
+    new Thread(task).start();
+    txManager.commit();
+    assertThat(region.get(key)).isSameAs(value);
+    assertThat(region.get(key1)).isSameAs(newValue1);
+  }
+
+  private void readTransactionAfterReservation() {
+    allowWriteTransactionToCommitLatch.countDown();
+    try {
+      allowReadTransactionToProceedLatch.await(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void doPutOnReadKeyTransaction() {
+    TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    txManager.begin();
+    region.put(key, newValue); // expect commit conflict
+    try {
+      allowWriteTransactionToCommitLatch.await(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    assertThatThrownBy(() -> txManager.commit()).isExactlyInstanceOf(CommitConflictException.class);
+    allowReadTransactionToProceedLatch.countDown();
+  }
+
+  @Test
+  public void readConflictsTransactionCanDetectStateChange() throws Exception {
+    cache.close();
+    createCache();
+
+    region.put(key, value);
+    region.put(key1, value1);
+    TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    txManager.begin();
+    assertThat(region.get(key)).isSameAs(value);
+    region.put(key1, newValue1);
+    Runnable task = () -> doPutTransaction();
+    new Thread(task).start();
+    allowReadTransactionToProceedLatch.await();
+    assertThatThrownBy(() -> txManager.commit()).isExactlyInstanceOf(CommitConflictException.class);
+    assertThat(region.get(key)).isSameAs(newValue);
+    assertThat(region.get(key1)).isSameAs(value1);
+  }
+
+  private void doPutTransaction() {
+    TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    txManager.begin();
+    region.put(key, newValue); // expect commit conflict
+    txManager.commit();
+    allowReadTransactionToProceedLatch.countDown();
+  }
 }