You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/01 21:03:26 UTC

[32/50] [abbrv] geode git commit: GEODE-3310: Set target node in TXStateProxy during TXFailover

GEODE-3310: Set target node in TXStateProxy during TXFailover

	During tx failover, the new server needs to set the target node in its TXStateProxy.
	Add unit test to verify the fix.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/03e11e87
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/03e11e87
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/03e11e87

Branch: refs/heads/feature/GEODE-3299
Commit: 03e11e8799a5782cf943f6e20cf55ae55aba56d9
Parents: e24438b
Author: eshu <es...@pivotal.io>
Authored: Thu Jul 27 16:48:34 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu Jul 27 16:51:56 2017 -0700

----------------------------------------------------------------------
 .../tier/sockets/command/TXFailoverCommand.java |  10 +-
 .../cache/execute/MyTransactionFunction.java    |  16 +++
 .../cache/execute/PRTransactionDUnitTest.java   | 130 +++++++++++++------
 .../sockets/command/TXFailoverCommandTest.java  |  84 ++++++++++++
 4 files changed, 202 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
index 6bd00c0..18b7c62 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
@@ -61,7 +61,7 @@ public class TXFailoverCommand extends BaseCommand {
     if (logger.isDebugEnabled()) {
       logger.debug("TX: Transaction {} from {} is failing over to this server", uniqId, client);
     }
-    TXId txId = new TXId(client, uniqId);
+    TXId txId = createTXId(client, uniqId);
     TXManagerImpl mgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
     mgr.waitForCompletingTransaction(txId); // in case it's already completing here in another
                                             // thread
@@ -92,6 +92,10 @@ public class TXFailoverCommand extends BaseCommand {
               "TX: txState is not local, bootstrapping PeerTXState stub for targetNode: {}",
               hostingMember);
         }
+        // GEODE-3310 set the target node in the tx
+        if (tx.getTarget() == null) {
+          tx.setTarget(hostingMember);
+        }
         // inject the real deal
         tx.setLocalTXState(new PeerTXStateStub(tx, hostingMember, client));
       } else {
@@ -127,4 +131,8 @@ public class TXFailoverCommand extends BaseCommand {
     serverConnection.setAsTrue(RESPONDED);
   }
 
+  TXId createTXId(InternalDistributedMember client, int uniqId) {
+    return new TXId(client, uniqId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
index 0970836..10e3282 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
@@ -86,6 +86,9 @@ public class MyTransactionFunction implements Function {
       case PRTransactionDUnitTest.VERIFY_REP_READ:
         verifyRepeatableRead(ctx);
         break;
+      case PRTransactionDUnitTest.UPDATE_NON_COLOCATION:
+        updateNonColocation(ctx);
+        break;
     }
     context.getResultSender().lastResult(null);
   }
@@ -476,6 +479,19 @@ public class MyTransactionFunction implements Function {
     mImp.commit();
   }
 
+  private void updateNonColocation(RegionFunctionContext ctx) {
+    Region custPR = ctx.getDataSet();
+
+    ArrayList args = (ArrayList) ctx.getArguments();
+    CustId custId = (CustId) args.get(1);
+    Customer newCus = (Customer) args.get(2);
+
+    custPR.put(custId, newCus);
+    Assert.assertTrue(custPR.containsKey(custId));
+    Assert.assertTrue(custPR.containsValueForKey(custId));
+
+  }
+
   public boolean hasResult() {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 7a56644..6578baa 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.logging.log4j.Logger;
 import org.assertj.core.api.Assertions;
 
 import java.util.ArrayList;
@@ -40,6 +41,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.TransactionDataNotColocatedException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
@@ -74,6 +76,7 @@ import org.apache.geode.test.dunit.SerializableRunnable;
  */
 @Category(DistributedTest.class)
 public class PRTransactionDUnitTest extends PRColocationDUnitTest {
+  private static final Logger logger = LogService.getLogger();
 
   public static final int VERIFY_TX = 0;
 
@@ -91,6 +94,8 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
 
   public static final int VERIFY_REP_READ = 8;
 
+  public static final int UPDATE_NON_COLOCATION = 9;
+
   final int totalIterations = 50;
 
   final int warmupIterations = 10;
@@ -131,6 +136,73 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
     basicPRTXInFunction(2, false);
   }
 
+  @Test
+  public void testBasicPRTransactionNonColatedFunction0() {
+    basicPRTXInNonColocatedFunction(0);
+  }
+
+  /**
+   * Test two non colocated functions in a transaction. This method invokes
+   * {@link MyTransactionFunction} and tells it what to test, using different arguments.
+   * 
+   * @param redundantBuckets redundant buckets for colocated PRs
+   */
+  protected void basicPRTXInNonColocatedFunction(int redundantBuckets) {
+    setupColocatedRegions(redundantBuckets);
+
+    dataStore1.invoke(() -> registerFunction());
+    dataStore2.invoke(() -> registerFunction());
+
+    dataStore1.invoke(() -> runTXFunctions());
+  }
+
+  private void registerFunction() {
+    logger.info("register Fn");
+    Function txFunction = new MyTransactionFunction();
+    FunctionService.registerFunction(txFunction);
+  }
+
+  private void runFunction(final Region pr, int cust, boolean isFirstFunc) {
+    CustId custId = new CustId(cust);
+    Customer newCus = new Customer("foo", "bar");
+    ArrayList args = new ArrayList();
+    Execution execution = FunctionService.onRegion(pr);
+    Set filter = new HashSet();
+
+    args.add(new Integer(UPDATE_NON_COLOCATION));
+    logger.info("UPDATE_NON_COLOCATION");
+    args.add(custId);
+    args.add(newCus);
+    filter.add(custId);
+    try {
+      execution.withFilter(filter).setArguments(args).execute(new MyTransactionFunction().getId())
+          .getResult();
+      assertTrue("Expected exception was not thrown", isFirstFunc);
+    } catch (Exception exp) {
+      if (!isFirstFunc) {
+        if (exp instanceof TransactionException && exp.getMessage()
+            .startsWith("Function execution is not colocated with transaction.")) {
+        } else {
+          logger.info("Expected to catch TransactionException but caught exception " + exp, exp);
+          Assert.fail("Expected to catch TransactionException but caught exception ", exp);
+        }
+      } else {
+        logger.info("Caught unexpected exception", exp);
+        Assert.fail("Unexpected exception was thrown", exp);
+      }
+    }
+  }
+
+  private void runTXFunctions() {
+    PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+        .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+    CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager();
+    mgr.begin();
+    runFunction(pr, 1, true);
+    runFunction(pr, 2, false);
+    mgr.commit();
+  }
+
   /**
    * Test all the basic functionality of colocated transactions. This method invokes
    * {@link MyTransactionFunction} and tells it what to test, using different arguments.
@@ -146,17 +218,9 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
       createColocatedPRs(redundantBuckets);
     }
 
-    SerializableCallable registerFunction = new SerializableCallable("register Fn") {
-      public Object call() throws Exception {
-        Function txFunction = new MyTransactionFunction();
-        FunctionService.registerFunction(txFunction);
-        return Boolean.TRUE;
-      }
-    };
-
-    dataStore1.invoke(registerFunction);
-    dataStore2.invoke(registerFunction);
-    dataStore3.invoke(registerFunction);
+    dataStore1.invoke(() -> registerFunction());
+    dataStore2.invoke(() -> registerFunction());
+    dataStore3.invoke(() -> registerFunction());
 
     accessor.invoke(new SerializableCallable("run function") {
       public Object call() throws Exception {
@@ -171,7 +235,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         ArrayList args = new ArrayList();
         Function txFunction = new MyTransactionFunction();
         FunctionService.registerFunction(txFunction);
-        Execution e = FunctionService.onRegion(pr);
+        Execution execution = FunctionService.onRegion(pr);
         Set filter = new HashSet();
         // test transaction non-coLocated operations
         filter.clear();
@@ -184,7 +248,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         args.add(order);
         filter.add(custId);
         try {
-          e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+          execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
           fail("Expected exception was not thrown");
         } catch (FunctionException fe) {
           LogWriterUtils.getLogWriter().info("Caught Expected exception");
@@ -201,7 +265,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         LogWriterUtils.getLogWriter().info("VERIFY_TX");
         orderpr.put(orderId, order);
         assertNotNull(orderpr.get(orderId));
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
         assertTrue("Unexpected customer value after commit", newCus.equals(pr.get(custId)));
         Order commitedOrder = (Order) orderpr.get(orderId);
         assertTrue(
@@ -209,26 +273,25 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
             order.equals(commitedOrder));
         // verify conflict detection
         args.set(0, new Integer(VERIFY_TXSTATE_CONFLICT));
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
         // verify that the transaction is rolled back
         args.set(0, new Integer(VERIFY_ROLLBACK));
         LogWriterUtils.getLogWriter().info("VERIFY_ROLLBACK");
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
         // verify destroy
         args.set(0, new Integer(VERIFY_DESTROY));
         LogWriterUtils.getLogWriter().info("VERIFY_DESTROY");
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
         // verify invalidate
         args.set(0, new Integer(VERIFY_INVALIDATE));
         LogWriterUtils.getLogWriter().info("VERIFY_INVALIDATE");
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
         return Boolean.TRUE;
       }
     });
   }
 
   protected void createColocatedPRs(int redundantBuckets) {
-
     createCacheInAllVms();
 
     redundancy = new Integer(redundantBuckets);
@@ -919,7 +982,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         ArrayList args = new ArrayList();
         Function txFunction = new MyTransactionFunction();
         FunctionService.registerFunction(txFunction);
-        Execution e = FunctionService.onRegion(pr);
+        Execution execution = FunctionService.onRegion(pr);
         Set filter = new HashSet();
         boolean caughtException = false;
         // test transaction non-coLocated operations
@@ -933,7 +996,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         args.add(order);
         filter.add(custId);
         caughtException = false;
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
         return null;
       }
     });
@@ -943,17 +1006,10 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
   @Test
   public void testRepeatableRead() throws Exception {
     createColocatedPRs(1);
-    SerializableCallable registerFunction = new SerializableCallable("register Fn") {
-      public Object call() throws Exception {
-        Function txFunction = new MyTransactionFunction();
-        FunctionService.registerFunction(txFunction);
-        return Boolean.TRUE;
-      }
-    };
 
-    dataStore1.invoke(registerFunction);
-    dataStore2.invoke(registerFunction);
-    dataStore3.invoke(registerFunction);
+    dataStore1.invoke(() -> registerFunction());
+    dataStore2.invoke(() -> registerFunction());
+    dataStore3.invoke(() -> registerFunction());
 
     accessor.invoke(new SerializableCallable("run function") {
       public Object call() throws Exception {
@@ -968,7 +1024,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         ArrayList args = new ArrayList();
         Function txFunction = new MyTransactionFunction();
         FunctionService.registerFunction(txFunction);
-        Execution e = FunctionService.onRegion(pr);
+        Execution execution = FunctionService.onRegion(pr);
         Set filter = new HashSet();
         filter.clear();
         args.clear();
@@ -979,7 +1035,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
         args.add(orderId);
         args.add(order);
         filter.add(custId);
-        e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+        execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
 
         return null;
       }
@@ -1013,7 +1069,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
       public Object call() throws Exception {
         long perfTime = 0;
         Region customerPR = basicGetCache().getRegion(CustomerPartitionedRegionName);
-        Execution e = FunctionService.onRegion(customerPR);
+        Execution execution = FunctionService.onRegion(customerPR);
         // for each customer, update order and shipment
         for (int iterations = 1; iterations <= totalIterations; iterations++) {
           LogWriterUtils.getLogWriter().info("running perfFunction");
@@ -1036,7 +1092,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
           if (iterations > warmupIterations) {
             startTime = NanoTimer.getTime();
           }
-          e.withFilter(filter).setArguments(args).execute("perfFunction").getResult();
+          execution.withFilter(filter).setArguments(args).execute("perfFunction").getResult();
           if (startTime > 0) {
             perfTime += NanoTimer.getTime() - startTime;
           }
@@ -1051,7 +1107,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
       public Object call() throws Exception {
         long perfTime = 0;
         Region customerPR = basicGetCache().getRegion(CustomerPartitionedRegionName);
-        Execution e = FunctionService.onRegion(customerPR);
+        Execution execution = FunctionService.onRegion(customerPR);
         // for each customer, update order and shipment
         for (int iterations = 1; iterations <= totalIterations; iterations++) {
           LogWriterUtils.getLogWriter().info("Running perfFunction");
@@ -1074,7 +1130,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
           if (iterations > warmupIterations) {
             startTime = NanoTimer.getTime();
           }
-          e.withFilter(filter).setArguments(args).execute("perfTxFunction").getResult();
+          execution.withFilter(filter).setArguments(args).execute("perfTxFunction").getResult();
           if (startTime > 0) {
             perfTime += NanoTimer.getTime() - startTime;
           }

http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java
new file mode 100644
index 0000000..654be90
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.FindRemoteTXMessage;
+import org.apache.geode.internal.cache.FindRemoteTXMessage.FindRemoteTXMessageReplyProcessor;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest({FindRemoteTXMessage.class})
+public class TXFailoverCommandTest {
+  @Test
+  public void testTXFailoverSettingTargetNode()
+      throws ClassNotFoundException, IOException, InterruptedException {
+    TXFailoverCommand cmd = mock(TXFailoverCommand.class);
+    Message msg = mock(Message.class);
+    ServerConnection serverConnection = mock(ServerConnection.class);
+    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
+    InternalDistributedMember client = mock(InternalDistributedMember.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    InternalCache cache = Fakes.cache();
+    int uniqueId = 1;
+    TXId txId = new TXId(client, uniqueId);
+    TXStateProxyImpl proxy = new TXStateProxyImpl(txMgr, txId, null);
+    FindRemoteTXMessageReplyProcessor processor = mock(FindRemoteTXMessageReplyProcessor.class);
+    InternalDistributedMember host = mock(InternalDistributedMember.class);
+
+    doCallRealMethod().when(cmd).cmdExecute(msg, serverConnection, null, 1);
+    when(serverConnection.getProxyID()).thenReturn(clientProxyMembershipID);
+    when(clientProxyMembershipID.getDistributedMember()).thenReturn(client);
+    when(msg.getTransactionId()).thenReturn(uniqueId);
+    when(serverConnection.getCache()).thenReturn(cache);
+    when(cache.getCacheTransactionManager()).thenReturn(txMgr);
+    when(txMgr.getTXState()).thenReturn(proxy);
+    when(cmd.createTXId(client, uniqueId)).thenReturn(txId);
+    PowerMockito.mockStatic(FindRemoteTXMessage.class);
+    PowerMockito.when(FindRemoteTXMessage.send(cache, txId)).thenReturn(processor);
+    when(processor.getHostingMember()).thenReturn(host);
+    when(proxy.getCache()).thenReturn(cache);
+    when(cache.getDistributedSystem()).thenReturn(mock(DistributedSystem.class));
+
+    cmd.cmdExecute(msg, serverConnection, null, 1);
+    assertNotNull(proxy.getRealDeal(host));
+    assertEquals(proxy.getTarget(), host);
+  }
+}