You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/08/03 18:01:21 UTC

geode git commit: GEODE-3379: Geode transaction need to commit on primary buckets only.

Repository: geode
Updated Branches:
  refs/heads/develop df104fcee -> f7198be32


GEODE-3379: Geode transaction need to commit on primary buckets only.

	Fail the transaction commit if primary buckets have been moved from transaction hosting node.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/f7198be3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/f7198be3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/f7198be3

Branch: refs/heads/develop
Commit: f7198be3228e333f65ad235ba06a0844ae258208
Parents: df104fc
Author: eshu <es...@pivotal.io>
Authored: Thu Aug 3 10:56:53 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu Aug 3 10:56:53 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/internal/cache/TXState.java    |  6 +-
 .../apache/geode/disttx/PRDistTXDUnitTest.java  |  4 +
 .../disttx/PRDistTXWithVersionsDUnitTest.java   |  4 +
 .../cache/execute/PRTransactionDUnitTest.java   | 94 +++++++++++++++-----
 4 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 2c8c28b..55415e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -668,7 +668,11 @@ public class TXState implements TXStateInterface {
       while (it.hasNext()) {
         Map.Entry<LocalRegion, TXRegionState> me = it.next();
         LocalRegion r = me.getKey();
-        if (r instanceof BucketRegion && (((BucketRegion) r).getBucketAdvisor().isPrimary())) {
+        if (r instanceof BucketRegion) {
+          if (isDistTx() && !((BucketRegion) r).getBucketAdvisor().isPrimary()) {
+            // For distTx we skip for taking locks on secondary.
+            continue;
+          }
           BucketRegion b = (BucketRegion) r;
           /*
            * Lock the primary bucket so it doesnt get rebalanced until we cleanup!

http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
index 318237e..a4f64ae 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXDUnitTest.java
@@ -80,4 +80,8 @@ public class PRDistTXDUnitTest extends PRTransactionDUnitTest {
   @Test
   public void testBasicPRTransactionNonColocatedFunction0() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank as they fail.")
+  @Override
+  @Test
+  public void testCommitToFailAfterPrimaryBucketMoved() {}
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
index 10a4394..1d92649 100644
--- a/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/disttx/PRDistTXWithVersionsDUnitTest.java
@@ -86,4 +86,8 @@ public class PRDistTXWithVersionsDUnitTest extends PRTransactionWithVersionsDUni
   @Test
   public void testBasicPRTransactionNonColocatedFunction0() {}
 
+  @Ignore("[DISTTX] TODO test overridden and intentionally left blank for distTx.")
+  @Override
+  @Test
+  public void testCommitToFailAfterPrimaryBucketMoved() {}
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/f7198be3/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index ad551ee..88e4007 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -18,16 +18,15 @@ import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
+import static com.googlecode.catchexception.CatchException.catchException;
+import static com.googlecode.catchexception.CatchException.caughtException;
 import static org.junit.Assert.*;
 
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.logging.log4j.Logger;
 import org.assertj.core.api.Assertions;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -50,6 +49,7 @@ import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -62,6 +62,9 @@ import org.apache.geode.internal.cache.execute.data.OrderId;
 import org.apache.geode.internal.cache.execute.data.Shipment;
 import org.apache.geode.internal.cache.execute.data.ShipmentId;
 import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
+import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperatorImpl;
+import org.apache.geode.internal.cache.partitioned.rebalance.ExplicitMoveDirector;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.Invoke;
@@ -542,11 +545,11 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
   protected void basicPRTXWithOpOnMovedBucket(Op op, int bucketRedundancy, DistributedMember dm1,
       DistributedMember dm2) {
     // First transaction.
-    TransactionId txId = (TransactionId) dataStore1.invoke(beginTx());
+    TransactionId txId = dataStore1.invoke(() -> beginTx(false));
     dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
 
     // Second one. Will go through different path (using TXState or TXStateStub)
-    txId = (TransactionId) dataStore1.invoke(beginTx());
+    txId = dataStore1.invoke(() -> beginTx(false));
     dataStore1.invoke(resumeTx(op, txId, dm1, dm2));
   }
 
@@ -560,21 +563,18 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
     };
   }
 
-  @SuppressWarnings({"rawtypes", "serial"})
-  private SerializableCallable beginTx() {
-    return new SerializableCallable("begin tx") {
-      @Override
-      public Object call() {
-        PartitionedRegion pr = (PartitionedRegion) basicGetCache()
-            .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
-        CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
-        CustId cust1 = new CustId(1);
-        mgr.begin();
-        Object value = pr.get(cust1);
-        assertNotNull(value);
-        return mgr.suspend();
-      }
-    };
+  private TransactionId beginTx(boolean doPut) {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
+    CustId cust1 = new CustId(1);
+    mgr.begin();
+    Object value = pr.get(cust1);
+    assertNotNull(value);
+    if (doPut) {
+      pr.put(cust1, "bar");
+    }
+    return mgr.suspend();
   }
 
   @SuppressWarnings("serial")
@@ -1152,6 +1152,60 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
 
   }
 
+  @Test
+  public void testCommitToFailAfterPrimaryBucketMoved() {
+    basicPRTXCommitToFailAfterPrimaryBucketMoved(1);
+  }
+
+  /**
+   * Test commit fail after the transactional node no longer hosts the primary bucket of the
+   * operations executed in the transaction.
+   *
+   * @param redundantBuckets redundant buckets for colocated PRs
+   */
+  protected void basicPRTXCommitToFailAfterPrimaryBucketMoved(int redundantBuckets) {
+    setupColocatedRegions(redundantBuckets);
+
+    InternalDistributedMember dm1 = (InternalDistributedMember) dataStore1.invoke(getDM());
+    InternalDistributedMember dm2 = (InternalDistributedMember) dataStore2.invoke(getDM());
+
+    TransactionId txId = dataStore1.invoke(() -> beginTx(true));
+    dataStore1.invoke(() -> movePrimaryBucket(dm1, dm2));
+    dataStore1.invoke(() -> resumeTxAfterPrimaryMoved(txId));
+
+  }
+
+  private void movePrimaryBucket(InternalDistributedMember dm1, InternalDistributedMember dm2) {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CustId cust1 = new CustId(1);
+    int bucketId = pr.getKeyInfo(cust1).getBucketId();
+    boolean isCust1LocalPrimary = pr.getBucketRegion(cust1).getBucketAdvisor().isPrimary();
+    InternalDistributedMember destination = isCust1LocalPrimary ? dm2 : dm1;
+    InternalDistributedMember source = isCust1LocalPrimary ? dm1 : dm2;
+
+    ExplicitMoveDirector director = new ExplicitMoveDirector(cust1, bucketId, source, destination,
+        pr.getCache().getDistributedSystem());
+    PartitionedRegionRebalanceOp rebalanceOp =
+        new PartitionedRegionRebalanceOp(pr, false, director, true, true);
+    BucketOperatorImpl operator = new BucketOperatorImpl(rebalanceOp);
+    boolean moved = operator.movePrimary(source, destination, bucketId);
+    if (!moved) {
+      fail("Not able to move primary bucket by invoking BucketOperatorImpl.movePrimary");
+    }
+  }
+
+  private void resumeTxAfterPrimaryMoved(TransactionId txId) {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CacheTransactionManager mgr = basicGetCache().getCacheTransactionManager();
+
+    mgr.resume(txId);
+
+    catchException(mgr).commit();
+    assertTrue(caughtException() instanceof TransactionDataRebalancedException);
+  }
+
   // Don't want to run the test twice
   @Test
   public void testColocatedPartitionedRegion() throws Throwable {}