You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2016/10/21 18:59:34 UTC

incubator-geode git commit: GEODE-2021: Non colocated gets in a transaction should get TransactionDataNotColocatedException

Repository: incubator-geode
Updated Branches:
  refs/heads/develop fadd92b05 -> 56917a26a


GEODE-2021: Non colocated gets in a transaction should get TransactionDataNotColocatedException

Throw TransactionDataNotColocatedException when get locally failed with BucketNotFoundException
Added a dunit test with two transactions with gets that will use TXStateStub or TXState based on data location.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/56917a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/56917a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/56917a26

Branch: refs/heads/develop
Commit: 56917a26a8916b83f0cec6e85285b5040ff66ee6
Parents: fadd92b
Author: eshu <es...@pivotal.io>
Authored: Fri Oct 21 11:43:36 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Oct 21 11:43:36 2016 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/PartitionedRegion.java |   6 +
 .../apache/geode/disttx/PRDistTXDUnitTest.java  |   5 +
 .../disttx/PRDistTXWithVersionsDUnitTest.java   |   5 +
 .../cache/execute/PRColocationDUnitTest.java    |   6 +-
 .../cache/execute/PRTransactionDUnitTest.java   | 131 ++++++++++++++++++-
 5 files changed, 151 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index f7ecdaf..df52764 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -4105,6 +4105,12 @@ public class PartitionedRegion extends LocalRegion implements
             retryTime.waitToRetryNode();
           }
         } else {
+          if (prce instanceof BucketNotFoundException) {
+            TransactionException ex = new TransactionDataNotColocatedException(LocalizedStrings.
+                PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(key));
+            ex.initCause(prce);
+            throw ex;
+          }
           Throwable cause = prce.getCause();
           if (cause instanceof PrimaryBucketException) {
             throw (PrimaryBucketException)cause;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index f36085b..68a83f1 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -37,6 +37,11 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest {
     return props;
   }
   
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.")
+  @Test
+  public void testTxWithNonColocatedGet() {
+  }
+  
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
   @Test
   public void testBasicPRTransactionRedundancy0() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index 268c2ed..d692468 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -37,6 +37,11 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni
     return props;
   }
   
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.")
+  @Test
+  public void testTxWithNonColocatedGet() {
+  }
+  
   @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
   @Override
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
index 1b8d2d1..f6ee565 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
@@ -2388,11 +2388,15 @@ public class PRColocationDUnitTest extends JUnit4CacheTestCase {
     assertTrue("Region should have failed to close. regionName = " + partitionedRegionName , exceptionThrown);    
   }
   public static void putCustomerPartitionedRegion(String partitionedRegionName) {
+    putCustomerPartitionedRegion(partitionedRegionName, 10);
+  }
+  
+  public static void putCustomerPartitionedRegion(String partitionedRegionName, int numOfRecord) {
     assertNotNull(basicGetCache());
     Region partitionedregion = basicGetCache().getRegion(Region.SEPARATOR
         + partitionedRegionName);
     assertNotNull(partitionedregion);
-    for (int i = 1; i <= 10; i++) {
+    for (int i = 1; i <= numOfRecord; i++) {
       CustId custid = new CustId(i);
       Customer customer = new Customer("name" + i, "Address" + i);
       try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 516c240..332ec01 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -25,10 +25,12 @@ import static org.junit.Assert.*;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
+import org.assertj.core.api.Assertions;
 
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import util.TestException;
@@ -45,6 +47,7 @@ import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.execute.data.CustId;
@@ -53,10 +56,13 @@ import org.apache.geode.internal.cache.execute.data.Order;
 import org.apache.geode.internal.cache.execute.data.OrderId;
 import org.apache.geode.internal.cache.execute.data.Shipment;
 import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
+import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableRunnable;
 
 /**
  * Test for co-located PR transactions.
@@ -316,18 +322,141 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
   }
 
   protected void createPRWithCoLocation(String prName, String coLocatedWith) {
+    setAttributes(prName, coLocatedWith);
+    createPartitionedRegion(attributeObjects);
+  }
+  
+  protected void setAttributes(String prName, String coLocatedWith) {
     this.regionName = prName;
     this.colocatedWith = coLocatedWith;
     this.isPartitionResolver = new Boolean(true);
     this.attributeObjects = new Object[] { regionName, redundancy, localMaxmemory,
         totalNumBuckets, colocatedWith, isPartitionResolver, getEnableConcurrency() };
-    createPartitionedRegion(attributeObjects);
   }
 
   protected boolean getEnableConcurrency() {
     return false;
   }
   
+  /**
+   * This method executes a transaction with get on non colocated entries and 
+   * expects the transaction to fail with TransactionDataNotColocatedException.
+   * @param bucketRedundancy redundancy for the colocated PRs
+   */
+  protected void baiscPRTXWithNonColocatedGet(int bucketRedundancy) {
+    dataStore1.invoke(runGetCache);
+    dataStore2.invoke(runGetCache);
+    redundancy = new Integer(bucketRedundancy);
+    localMaxmemory = new Integer(50);
+    totalNumBuckets = new Integer(2);
+    
+    setAttributes(CustomerPartitionedRegionName, null);
+
+    dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+    dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+
+    // Put the customer 1-2 in CustomerPartitionedRegion
+    dataStore1.invoke(() -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 2));
+
+    dataStore1.invoke(verifyNonColocated);
+    dataStore2.invoke(verifyNonColocated);
+    
+    dataStore1.invoke(getTx);
+  }
+  
+
+  @SuppressWarnings("serial")
+  private SerializableRunnable verifyNonColocated = new SerializableRunnable("verifyNonColocated") {
+    @Override
+    public void run() throws PRLocallyDestroyedException, ForceReattemptException {
+      containsKeyLocally();
+    }
+  };
+  
+  @SuppressWarnings("serial")
+  private SerializableRunnable getTx = new SerializableRunnable("getTx") {
+    @Override
+    public void run() {
+      performGetTx();
+    }
+  };
+
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void containsKeyLocally() throws PRLocallyDestroyedException, ForceReattemptException {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    
+    CustId cust1 = new CustId(1);
+    CustId cust2 = new CustId(2);
+    int bucketId1 = pr.getKeyInfo(cust1).getBucketId();
+    int bucketId2 = pr.getKeyInfo(cust2).getBucketId();
+    
+    List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
+    Set localBucket1Keys;
+    Set localBucket2Keys;
+    assertTrue(localPrimaryBucketList.size() == 1);
+    for (int bucketId: localPrimaryBucketList) {
+      if (bucketId == bucketId1) {
+        //primary bucket has cust1
+        localBucket1Keys = pr.getDataStore().getKeysLocally(bucketId1, false);
+        for (Object key: localBucket1Keys) {
+          LogService.getLogger().info("local key set contains " + key);
+        }
+        assertTrue(localBucket1Keys.size() == 1);
+      } else {
+        localBucket2Keys = pr.getDataStore().getKeysLocally(bucketId2, false);
+        for (Object key: localBucket2Keys) {
+          LogService.getLogger().info("local key set contains " + key);
+        }
+        assertTrue(localBucket2Keys.size() == 1);
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void performGetTx() {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager();
+    CustId cust1 = new CustId(1);
+    CustId cust2 = new CustId(2);
+    int bucketId1 = pr.getKeyInfo(cust1).getBucketId();
+    List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
+    assertTrue(localPrimaryBucketList.size() == 1);
+    boolean isCust1Local = (Integer)localPrimaryBucketList.get(0) == bucketId1;
+
+    //touch first get on remote node -- using TXStateStub
+    Assertions.assertThatThrownBy(()-> getTx(!isCust1Local, mgr, pr, cust1, cust2))
+    .isInstanceOf(TransactionDataNotColocatedException.class);
+
+   //touch first get on local node-- using TXState
+    Assertions.assertThatThrownBy(()-> getTx(isCust1Local, mgr, pr, cust1, cust2))
+    .isInstanceOf(TransactionDataNotColocatedException.class);
+  }
+  
+  private void getTx(boolean doCust1First, CacheTransactionManager mgr, PartitionedRegion pr, CustId cust1, CustId cust2) {
+    CustId first = doCust1First ? cust1 : cust2;
+    CustId second = !doCust1First ? cust1 : cust2;
+    
+    mgr.begin();
+    boolean doRollback = true;
+    try {
+      pr.get(first);
+      pr.get(second);
+      doRollback = false;
+    } finally {
+      if (doRollback) {
+        mgr.rollback();
+      } else {
+        mgr.commit();
+      }
+    }
+  }
+  
+  @Test
+  public void testTxWithNonColocatedGet() {
+    baiscPRTXWithNonColocatedGet(0);
+  }
+  
   @Test
   public void testPRTXInCacheListenerRedundancy0() {
     basicPRTXInCacheListener(0);