You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/04 23:25:45 UTC
[17/27] geode git commit: GEODE-3379: Geode transaction need to
commit on primary buckets only.
GEODE-3379: Geode transaction need to commit on primary buckets only.
Fail the transaction commit if primary buckets have been moved from transaction hosting node.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f7198be3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f7198be3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f7198be3
Branch: refs/heads/feature/GEODE-3299
Commit: f7198be3228e333f65ad235ba06a0844ae258208
Parents: df104fc
Author: eshu <es...@pivotal.io>
Authored: Thu Aug 3 10:56:53 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu Aug 3 10:56:53 2017 -0700
----------------------------------------------------------------------
.../apache/geode/internal/cache/TXState.java | 6 +-
.../apache/geode/disttx/PRDistTXDUnitTest.java | 4 +
.../disttx/PRDistTXWithVersionsDUnitTest.java | 4 +
.../cache/execute/PRTransactionDUnitTest.java | 94 +++++++++++++++-----
4 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 2c8c28b..55415e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -668,7 +668,11 @@ public class TXState implements TXStateInterface {
while (it.hasNext()) {
Map.Entry<LocalRegion, TXRegionState> me = it.next();
LocalRegion r = me.getKey();
- if (r instanceof BucketRegion && (((BucketRegion) r).getBucketAdvisor().isPrimary())) {
+ if (r instanceof BucketRegion) {
+ if (isDistTx() && !((BucketRegion) r).getBucketAdvisor().isPrimary()) {
+ // For distTx we skip for taking locks on secondary.
+ continue;
+ }
BucketRegion b = (BucketRegion) r;
/*
* Lock the primary bucket so it doesnt get rebalanced until we cleanup!
http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index 318237e..a4f64ae 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -80,4 +80,8 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest {
@Test
public void testBasicPRTransactionNonColocatedFunction0() {}
+ @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
+ @Override
+ @Test
+ public void testCommitToFailAfterPrimaryBucketMoved() {}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index 10a4394..1d92649 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -86,4 +86,8 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni
@Test
public void testBasicPRTransactionNonColocatedFunction0() {}
+ @Ignore("[DISTTX] TODO test overridden and intentionally left blank for distTx.")
+ @Override
+ @Test
+ public void testCommitToFailAfterPrimaryBucketMoved() {}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index ad551ee..88e4007 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -18,16 +18,15 @@ import org.junit.Ignore;
import org.junit.experimental.categories.Category;
import org.junit.Test;
+import static com.googlecode.catchexception.CatchException.catchException;
+import static com.googlecode.catchexception.CatchException.caughtException;
import static org.junit.Assert.*;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.logging.log4j.Logger;
import org.assertj.core.api.Assertions;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -50,6 +49,7 @@ import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -62,6 +62,9 @@ import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
+import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperatorImpl;
+import org.apache.geode.internal.cache.partitioned.rebalance.ExplicitMoveDirector;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.Invoke;
@@ -542,11 +545,11 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
protected void basicPRTXWithOpOnMovedBucket(Op op, int bucketRedundancy, DistributedMember dm1,
DistributedMember dm2) {
// First transaction.
- TransactionId txId = (TransactionId) dataStore1.invoke(beginTx());
+ TransactionId txId = dataStore1.invoke(() -> beginTx(false));
dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
// Second one. Will go through different path (using TXState or TXStateStub)
- txId = (TransactionId) dataStore1.invoke(beginTx());
+ txId = dataStore1.invoke(() -> beginTx(false));
dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
}
@@ -560,21 +563,18 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
};
}
- @SuppressWarnings({"rawtypes", "serial"})
- private SerializableCallable beginTx() {
- return new SerializableCallable("begin tx") {
- @Override
- public Object call() {
- PartitionedRegion pr = (PartitionedRegion) basicGetCache()
- .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
- CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
- CustId cust1 = new CustId(1);
- mgr.begin();
- Object value = pr.get(cust1);
- assertNotNull(value);
- return mgr.suspend();
- }
- };
+ private TransactionId beginTx(boolean doPut) {
+ PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+ .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+ CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
+ CustId cust1 = new CustId(1);
+ mgr.begin();
+ Object value = pr.get(cust1);
+ assertNotNull(value);
+ if (doPut) {
+ pr.put(cust1, "bar");
+ }
+ return mgr.suspend();
}
@SuppressWarnings("serial")
@@ -1152,6 +1152,60 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
}
+ @Test
+ public void testCommitToFailAfterPrimaryBucketMoved() {
+ basicPRTXCommitToFailAfterPrimaryBucketMoved(1);
+ }
+
+ /**
+ * Test commit fail after the transactional node no longer hosts the primary bucket of the
+ * operations executed in the transaction.
+ *
+ * @param redundantBuckets redundant buckets for colocated PRs
+ */
+ protected void basicPRTXCommitToFailAfterPrimaryBucketMoved(int redundantBuckets) {
+ setupColocatedRegions(redundantBuckets);
+
+ InternalDistributedMember dm1 = (InternalDistributedMember) dataStore1.invoke(getDM());
+ InternalDistributedMember dm2 = (InternalDistributedMember) dataStore2.invoke(getDM());
+
+ TransactionId txId = dataStore1.invoke(() -> beginTx(true));
+ dataStore1.invoke(() -> movePrimaryBucket(dm1, dm2));
+ dataStore1.invoke(() -> resumeTxAfterPrimaryMoved(txId));
+
+ }
+
+ private void movePrimaryBucket(InternalDistributedMember dm1, InternalDistributedMember dm2) {
+ PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+ .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+ CustId cust1 = new CustId(1);
+ int bucketId = pr.getKeyInfo(cust1).getBucketId();
+ boolean isCust1LocalPrimary = pr.getBucketRegion(cust1).getBucketAdvisor().isPrimary();
+ InternalDistributedMember destination = isCust1LocalPrimary ? dm2 : dm1;
+ InternalDistributedMember source = isCust1LocalPrimary ? dm1 : dm2;
+
+ ExplicitMoveDirector director = new ExplicitMoveDirector(cust1, bucketId, source, destination,
+ pr.getCache().getDistributedSystem());
+ PartitionedRegionRebalanceOp rebalanceOp =
+ new PartitionedRegionRebalanceOp(pr, false, director, true, true);
+ BucketOperatorImpl operator = new BucketOperatorImpl(rebalanceOp);
+ boolean moved = operator.movePrimary(source, destination, bucketId);
+ if (!moved) {
+ fail("Not able to move primary bucket by invoking BucketOperatorImpl.movePrimary");
+ }
+ }
+
+ private void resumeTxAfterPrimaryMoved(TransactionId txId) {
+ PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+ .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+ CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
+
+ mgr.resume(txId);
+
+ catchException(mgr).commit();
+ assertTrue(caughtException() instanceof TransactionDataRebalancedException);
+ }
+
// Don't want to run the test twice
@Test
public void testColocatedPartitionedRegion() throws Throwable {}