You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/10/31 20:56:22 UTC
[42/50] [abbrv] incubator-geode git commit: GEODE-2021: Non colocated
gets in a transaction should get TransactionDataNotColocatedException
GEODE-2021: Non colocated gets in a transaction should get TransactionDataNotColocatedException
Throw TransactionDataNotColocatedException when get locally failed with BucketNotFoundException
Added a dunit test with two transactions with gets that will use TXStateStub or TXState based on data location.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/56917a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/56917a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/56917a26
Branch: refs/heads/feature/GEODE-1930
Commit: 56917a26a8916b83f0cec6e85285b5040ff66ee6
Parents: fadd92b
Author: eshu <es...@pivotal.io>
Authored: Fri Oct 21 11:43:36 2016 -0700
Committer: eshu <es...@pivotal.io>
Committed: Fri Oct 21 11:43:36 2016 -0700
----------------------------------------------------------------------
.../geode/internal/cache/PartitionedRegion.java | 6 +
.../apache/geode/disttx/PRDistTXDUnitTest.java | 5 +
.../disttx/PRDistTXWithVersionsDUnitTest.java | 5 +
.../cache/execute/PRColocationDUnitTest.java | 6 +-
.../cache/execute/PRTransactionDUnitTest.java | 131 ++++++++++++++++++-
5 files changed, 151 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index f7ecdaf..df52764 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -4105,6 +4105,12 @@ public class PartitionedRegion extends LocalRegion implements
retryTime.waitToRetryNode();
}
} else {
+ if (prce instanceof BucketNotFoundException) {
+ TransactionException ex = new TransactionDataNotColocatedException(LocalizedStrings.
+ PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(key));
+ ex.initCause(prce);
+ throw ex;
+ }
Throwable cause = prce.getCause();
if (cause instanceof PrimaryBucketException) {
throw (PrimaryBucketException)cause;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index f36085b..68a83f1 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -37,6 +37,11 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest {
return props;
}
+ @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.")
+ @Test
+ public void testTxWithNonColocatedGet() {
+ }
+
@Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
@Test
public void testBasicPRTransactionRedundancy0() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index 268c2ed..d692468 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -37,6 +37,11 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni
return props;
}
+ @Ignore("[DISTTX] TODO test overridden and intentionally left blank as it does not apply to disttx.")
+ @Test
+ public void testTxWithNonColocatedGet() {
+ }
+
@Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
@Override
@Test
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
index 1b8d2d1..f6ee565 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRColocationDUnitTest.java
@@ -2388,11 +2388,15 @@ public class PRColocationDUnitTest extends JUnit4CacheTestCase {
assertTrue("Region should have failed to close. regionName = " + partitionedRegionName , exceptionThrown);
}
public static void putCustomerPartitionedRegion(String partitionedRegionName) {
+ putCustomerPartitionedRegion(partitionedRegionName, 10);
+ }
+
+ public static void putCustomerPartitionedRegion(String partitionedRegionName, int numOfRecord) {
assertNotNull(basicGetCache());
Region partitionedregion = basicGetCache().getRegion(Region.SEPARATOR
+ partitionedRegionName);
assertNotNull(partitionedregion);
- for (int i = 1; i <= 10; i++) {
+ for (int i = 1; i <= numOfRecord; i++) {
CustId custid = new CustId(i);
Customer customer = new Customer("name" + i, "Address" + i);
try {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/56917a26/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 516c240..332ec01 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -25,10 +25,12 @@ import static org.junit.Assert.*;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
+import org.assertj.core.api.Assertions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import util.TestException;
@@ -45,6 +47,7 @@ import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.execute.data.CustId;
@@ -53,10 +56,13 @@ import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
+import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableRunnable;
/**
* Test for co-located PR transactions.
@@ -316,18 +322,141 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
}
protected void createPRWithCoLocation(String prName, String coLocatedWith) {
+ setAttributes(prName, coLocatedWith);
+ createPartitionedRegion(attributeObjects);
+ }
+
+ protected void setAttributes(String prName, String coLocatedWith) {
this.regionName = prName;
this.colocatedWith = coLocatedWith;
this.isPartitionResolver = new Boolean(true);
this.attributeObjects = new Object[] { regionName, redundancy, localMaxmemory,
totalNumBuckets, colocatedWith, isPartitionResolver, getEnableConcurrency() };
- createPartitionedRegion(attributeObjects);
}
protected boolean getEnableConcurrency() {
return false;
}
+ /**
+ * This method executes a transaction with get on non colocated entries and
+ * expects the transaction to fail with TransactionDataNotColocatedException.
+ * @param bucketRedundancy redundancy for the colocated PRs
+ */
+ protected void baiscPRTXWithNonColocatedGet(int bucketRedundancy) {
+ dataStore1.invoke(runGetCache);
+ dataStore2.invoke(runGetCache);
+ redundancy = new Integer(bucketRedundancy);
+ localMaxmemory = new Integer(50);
+ totalNumBuckets = new Integer(2);
+
+ setAttributes(CustomerPartitionedRegionName, null);
+
+ dataStore1.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+ dataStore2.invoke(PRColocationDUnitTest.class, "createPR", this.attributeObjects);
+
+ // Put the customer 1-2 in CustomerPartitionedRegion
+ dataStore1.invoke(() -> PRColocationDUnitTest.putCustomerPartitionedRegion(CustomerPartitionedRegionName, 2));
+
+ dataStore1.invoke(verifyNonColocated);
+ dataStore2.invoke(verifyNonColocated);
+
+ dataStore1.invoke(getTx);
+ }
+
+
+ @SuppressWarnings("serial")
+ private SerializableRunnable verifyNonColocated = new SerializableRunnable("verifyNonColocated") {
+ @Override
+ public void run() throws PRLocallyDestroyedException, ForceReattemptException {
+ containsKeyLocally();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ private SerializableRunnable getTx = new SerializableRunnable("getTx") {
+ @Override
+ public void run() {
+ performGetTx();
+ }
+ };
+
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void containsKeyLocally() throws PRLocallyDestroyedException, ForceReattemptException {
+ PartitionedRegion pr = (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+
+ CustId cust1 = new CustId(1);
+ CustId cust2 = new CustId(2);
+ int bucketId1 = pr.getKeyInfo(cust1).getBucketId();
+ int bucketId2 = pr.getKeyInfo(cust2).getBucketId();
+
+ List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
+ Set localBucket1Keys;
+ Set localBucket2Keys;
+ assertTrue(localPrimaryBucketList.size() == 1);
+ for (int bucketId: localPrimaryBucketList) {
+ if (bucketId == bucketId1) {
+ //primary bucket has cust1
+ localBucket1Keys = pr.getDataStore().getKeysLocally(bucketId1, false);
+ for (Object key: localBucket1Keys) {
+ LogService.getLogger().info("local key set contains " + key);
+ }
+ assertTrue(localBucket1Keys.size() == 1);
+ } else {
+ localBucket2Keys = pr.getDataStore().getKeysLocally(bucketId2, false);
+ for (Object key: localBucket2Keys) {
+ LogService.getLogger().info("local key set contains " + key);
+ }
+ assertTrue(localBucket2Keys.size() == 1);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void performGetTx() {
+ PartitionedRegion pr = (PartitionedRegion) basicGetCache().getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+ CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager();
+ CustId cust1 = new CustId(1);
+ CustId cust2 = new CustId(2);
+ int bucketId1 = pr.getKeyInfo(cust1).getBucketId();
+ List<Integer> localPrimaryBucketList = pr.getLocalPrimaryBucketsListTestOnly();
+ assertTrue(localPrimaryBucketList.size() == 1);
+ boolean isCust1Local = (Integer)localPrimaryBucketList.get(0) == bucketId1;
+
+ //touch first get on remote node -- using TXStateStub
+ Assertions.assertThatThrownBy(()-> getTx(!isCust1Local, mgr, pr, cust1, cust2))
+ .isInstanceOf(TransactionDataNotColocatedException.class);
+
+ //touch first get on local node-- using TXState
+ Assertions.assertThatThrownBy(()-> getTx(isCust1Local, mgr, pr, cust1, cust2))
+ .isInstanceOf(TransactionDataNotColocatedException.class);
+ }
+
+ private void getTx(boolean doCust1First, CacheTransactionManager mgr, PartitionedRegion pr, CustId cust1, CustId cust2) {
+ CustId first = doCust1First ? cust1 : cust2;
+ CustId second = !doCust1First ? cust1 : cust2;
+
+ mgr.begin();
+ boolean doRollback = true;
+ try {
+ pr.get(first);
+ pr.get(second);
+ doRollback = false;
+ } finally {
+ if (doRollback) {
+ mgr.rollback();
+ } else {
+ mgr.commit();
+ }
+ }
+ }
+
+ @Test
+ public void testTxWithNonColocatedGet() {
+ baiscPRTXWithNonColocatedGet(0);
+ }
+
@Test
public void testPRTXInCacheListenerRedundancy0() {
basicPRTXInCacheListener(0);