You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/01 21:03:26 UTC
[32/50] [abbrv] geode git commit: GEODE-3310: Set target node in
TXStateProxy during TXFailover
GEODE-3310: Set target node in TXStateProxy during TXFailover
During tx failover, the new server needs to set the target node in its TXStateProxy.
Add unit test to verify the fix.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/03e11e87
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/03e11e87
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/03e11e87
Branch: refs/heads/feature/GEODE-3299
Commit: 03e11e8799a5782cf943f6e20cf55ae55aba56d9
Parents: e24438b
Author: eshu <es...@pivotal.io>
Authored: Thu Jul 27 16:48:34 2017 -0700
Committer: eshu <es...@pivotal.io>
Committed: Thu Jul 27 16:51:56 2017 -0700
----------------------------------------------------------------------
.../tier/sockets/command/TXFailoverCommand.java | 10 +-
.../cache/execute/MyTransactionFunction.java | 16 +++
.../cache/execute/PRTransactionDUnitTest.java | 130 +++++++++++++------
.../sockets/command/TXFailoverCommandTest.java | 84 ++++++++++++
4 files changed, 202 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
index 6bd00c0..18b7c62 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommand.java
@@ -61,7 +61,7 @@ public class TXFailoverCommand extends BaseCommand {
if (logger.isDebugEnabled()) {
logger.debug("TX: Transaction {} from {} is failing over to this server", uniqId, client);
}
- TXId txId = new TXId(client, uniqId);
+ TXId txId = createTXId(client, uniqId);
TXManagerImpl mgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
mgr.waitForCompletingTransaction(txId); // in case it's already completing here in another
// thread
@@ -92,6 +92,10 @@ public class TXFailoverCommand extends BaseCommand {
"TX: txState is not local, bootstrapping PeerTXState stub for targetNode: {}",
hostingMember);
}
+ // GEODE-3310 set the target node in the tx
+ if (tx.getTarget() == null) {
+ tx.setTarget(hostingMember);
+ }
// inject the real deal
tx.setLocalTXState(new PeerTXStateStub(tx, hostingMember, client));
} else {
@@ -127,4 +131,8 @@ public class TXFailoverCommand extends BaseCommand {
serverConnection.setAsTrue(RESPONDED);
}
+ TXId createTXId(InternalDistributedMember client, int uniqId) {
+ return new TXId(client, uniqId);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
index 0970836..10e3282 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/MyTransactionFunction.java
@@ -86,6 +86,9 @@ public class MyTransactionFunction implements Function {
case PRTransactionDUnitTest.VERIFY_REP_READ:
verifyRepeatableRead(ctx);
break;
+ case PRTransactionDUnitTest.UPDATE_NON_COLOCATION:
+ updateNonColocation(ctx);
+ break;
}
context.getResultSender().lastResult(null);
}
@@ -476,6 +479,19 @@ public class MyTransactionFunction implements Function {
mImp.commit();
}
+ private void updateNonColocation(RegionFunctionContext ctx) {
+ Region custPR = ctx.getDataSet();
+
+ ArrayList args = (ArrayList) ctx.getArguments();
+ CustId custId = (CustId) args.get(1);
+ Customer newCus = (Customer) args.get(2);
+
+ custPR.put(custId, newCus);
+ Assert.assertTrue(custPR.containsKey(custId));
+ Assert.assertTrue(custPR.containsValueForKey(custId));
+
+ }
+
public boolean hasResult() {
return true;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
index 7a56644..6578baa 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRTransactionDUnitTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.logging.log4j.Logger;
import org.assertj.core.api.Assertions;
import java.util.ArrayList;
@@ -40,6 +41,7 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
@@ -74,6 +76,7 @@ import org.apache.geode.test.dunit.SerializableRunnable;
*/
@Category(DistributedTest.class)
public class PRTransactionDUnitTest extends PRColocationDUnitTest {
+ private static final Logger logger = LogService.getLogger();
public static final int VERIFY_TX = 0;
@@ -91,6 +94,8 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
public static final int VERIFY_REP_READ = 8;
+ public static final int UPDATE_NON_COLOCATION = 9;
+
final int totalIterations = 50;
final int warmupIterations = 10;
@@ -131,6 +136,73 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
basicPRTXInFunction(2, false);
}
+ @Test
+ public void testBasicPRTransactionNonColatedFunction0() {
+ basicPRTXInNonColocatedFunction(0);
+ }
+
+ /**
+ * Test two non colocated functions in a transaction. This method invokes
+ * {@link MyTransactionFunction} and tells it what to test, using different arguments.
+ *
+ * @param redundantBuckets redundant buckets for colocated PRs
+ */
+ protected void basicPRTXInNonColocatedFunction(int redundantBuckets) {
+ setupColocatedRegions(redundantBuckets);
+
+ dataStore1.invoke(() -> registerFunction());
+ dataStore2.invoke(() -> registerFunction());
+
+ dataStore1.invoke(() -> runTXFunctions());
+ }
+
+ private void registerFunction() {
+ logger.info("register Fn");
+ Function txFunction = new MyTransactionFunction();
+ FunctionService.registerFunction(txFunction);
+ }
+
+ private void runFunction(final Region pr, int cust, boolean isFirstFunc) {
+ CustId custId = new CustId(cust);
+ Customer newCus = new Customer("foo", "bar");
+ ArrayList args = new ArrayList();
+ Execution execution = FunctionService.onRegion(pr);
+ Set filter = new HashSet();
+
+ args.add(new Integer(UPDATE_NON_COLOCATION));
+ logger.info("UPDATE_NON_COLOCATION");
+ args.add(custId);
+ args.add(newCus);
+ filter.add(custId);
+ try {
+ execution.withFilter(filter).setArguments(args).execute(new MyTransactionFunction().getId())
+ .getResult();
+ assertTrue("Expected exception was not thrown", isFirstFunc);
+ } catch (Exception exp) {
+ if (!isFirstFunc) {
+ if (exp instanceof TransactionException && exp.getMessage()
+ .startsWith("Function execution is not colocated with transaction.")) {
+ } else {
+ logger.info("Expected to catch TransactionException but caught exception " + exp, exp);
+ Assert.fail("Expected to catch TransactionException but caught exception ", exp);
+ }
+ } else {
+ logger.info("Caught unexpected exception", exp);
+ Assert.fail("Unexpected exception was thrown", exp);
+ }
+ }
+ }
+
+ private void runTXFunctions() {
+ PartitionedRegion pr = (PartitionedRegion) basicGetCache()
+ .getRegion(Region.SEPARATOR + CustomerPartitionedRegionName);
+ CacheTransactionManager mgr = pr.getCache().getCacheTransactionManager();
+ mgr.begin();
+ runFunction(pr, 1, true);
+ runFunction(pr, 2, false);
+ mgr.commit();
+ }
+
/**
* Test all the basic functionality of colocated transactions. This method invokes
* {@link MyTransactionFunction} and tells it what to test, using different arguments.
@@ -146,17 +218,9 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
createColocatedPRs(redundantBuckets);
}
- SerializableCallable registerFunction = new SerializableCallable("register Fn") {
- public Object call() throws Exception {
- Function txFunction = new MyTransactionFunction();
- FunctionService.registerFunction(txFunction);
- return Boolean.TRUE;
- }
- };
-
- dataStore1.invoke(registerFunction);
- dataStore2.invoke(registerFunction);
- dataStore3.invoke(registerFunction);
+ dataStore1.invoke(() -> registerFunction());
+ dataStore2.invoke(() -> registerFunction());
+ dataStore3.invoke(() -> registerFunction());
accessor.invoke(new SerializableCallable("run function") {
public Object call() throws Exception {
@@ -171,7 +235,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
ArrayList args = new ArrayList();
Function txFunction = new MyTransactionFunction();
FunctionService.registerFunction(txFunction);
- Execution e = FunctionService.onRegion(pr);
+ Execution execution = FunctionService.onRegion(pr);
Set filter = new HashSet();
// test transaction non-coLocated operations
filter.clear();
@@ -184,7 +248,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
args.add(order);
filter.add(custId);
try {
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
fail("Expected exception was not thrown");
} catch (FunctionException fe) {
LogWriterUtils.getLogWriter().info("Caught Expected exception");
@@ -201,7 +265,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
LogWriterUtils.getLogWriter().info("VERIFY_TX");
orderpr.put(orderId, order);
assertNotNull(orderpr.get(orderId));
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
assertTrue("Unexpected customer value after commit", newCus.equals(pr.get(custId)));
Order commitedOrder = (Order) orderpr.get(orderId);
assertTrue(
@@ -209,26 +273,25 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
order.equals(commitedOrder));
// verify conflict detection
args.set(0, new Integer(VERIFY_TXSTATE_CONFLICT));
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
// verify that the transaction is rolled back
args.set(0, new Integer(VERIFY_ROLLBACK));
LogWriterUtils.getLogWriter().info("VERIFY_ROLLBACK");
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
// verify destroy
args.set(0, new Integer(VERIFY_DESTROY));
LogWriterUtils.getLogWriter().info("VERIFY_DESTROY");
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
// verify invalidate
args.set(0, new Integer(VERIFY_INVALIDATE));
LogWriterUtils.getLogWriter().info("VERIFY_INVALIDATE");
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
return Boolean.TRUE;
}
});
}
protected void createColocatedPRs(int redundantBuckets) {
-
createCacheInAllVms();
redundancy = new Integer(redundantBuckets);
@@ -919,7 +982,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
ArrayList args = new ArrayList();
Function txFunction = new MyTransactionFunction();
FunctionService.registerFunction(txFunction);
- Execution e = FunctionService.onRegion(pr);
+ Execution execution = FunctionService.onRegion(pr);
Set filter = new HashSet();
boolean caughtException = false;
// test transaction non-coLocated operations
@@ -933,7 +996,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
args.add(order);
filter.add(custId);
caughtException = false;
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
return null;
}
});
@@ -943,17 +1006,10 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
@Test
public void testRepeatableRead() throws Exception {
createColocatedPRs(1);
- SerializableCallable registerFunction = new SerializableCallable("register Fn") {
- public Object call() throws Exception {
- Function txFunction = new MyTransactionFunction();
- FunctionService.registerFunction(txFunction);
- return Boolean.TRUE;
- }
- };
- dataStore1.invoke(registerFunction);
- dataStore2.invoke(registerFunction);
- dataStore3.invoke(registerFunction);
+ dataStore1.invoke(() -> registerFunction());
+ dataStore2.invoke(() -> registerFunction());
+ dataStore3.invoke(() -> registerFunction());
accessor.invoke(new SerializableCallable("run function") {
public Object call() throws Exception {
@@ -968,7 +1024,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
ArrayList args = new ArrayList();
Function txFunction = new MyTransactionFunction();
FunctionService.registerFunction(txFunction);
- Execution e = FunctionService.onRegion(pr);
+ Execution execution = FunctionService.onRegion(pr);
Set filter = new HashSet();
filter.clear();
args.clear();
@@ -979,7 +1035,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
args.add(orderId);
args.add(order);
filter.add(custId);
- e.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
+ execution.withFilter(filter).setArguments(args).execute(txFunction.getId()).getResult();
return null;
}
@@ -1013,7 +1069,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
public Object call() throws Exception {
long perfTime = 0;
Region customerPR = basicGetCache().getRegion(CustomerPartitionedRegionName);
- Execution e = FunctionService.onRegion(customerPR);
+ Execution execution = FunctionService.onRegion(customerPR);
// for each customer, update order and shipment
for (int iterations = 1; iterations <= totalIterations; iterations++) {
LogWriterUtils.getLogWriter().info("running perfFunction");
@@ -1036,7 +1092,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
if (iterations > warmupIterations) {
startTime = NanoTimer.getTime();
}
- e.withFilter(filter).setArguments(args).execute("perfFunction").getResult();
+ execution.withFilter(filter).setArguments(args).execute("perfFunction").getResult();
if (startTime > 0) {
perfTime += NanoTimer.getTime() - startTime;
}
@@ -1051,7 +1107,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
public Object call() throws Exception {
long perfTime = 0;
Region customerPR = basicGetCache().getRegion(CustomerPartitionedRegionName);
- Execution e = FunctionService.onRegion(customerPR);
+ Execution execution = FunctionService.onRegion(customerPR);
// for each customer, update order and shipment
for (int iterations = 1; iterations <= totalIterations; iterations++) {
LogWriterUtils.getLogWriter().info("Running perfFunction");
@@ -1074,7 +1130,7 @@ public class PRTransactionDUnitTest extends PRColocationDUnitTest {
if (iterations > warmupIterations) {
startTime = NanoTimer.getTime();
}
- e.withFilter(filter).setArguments(args).execute("perfTxFunction").getResult();
+ execution.withFilter(filter).setArguments(args).execute("perfTxFunction").getResult();
if (startTime > 0) {
perfTime += NanoTimer.getTime() - startTime;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/03e11e87/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java
new file mode 100644
index 0000000..654be90
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXFailoverCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.FindRemoteTXMessage;
+import org.apache.geode.internal.cache.FindRemoteTXMessage.FindRemoteTXMessageReplyProcessor;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+
+@Category(UnitTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest({FindRemoteTXMessage.class})
+public class TXFailoverCommandTest {
+ @Test
+ public void testTXFailoverSettingTargetNode()
+ throws ClassNotFoundException, IOException, InterruptedException {
+ TXFailoverCommand cmd = mock(TXFailoverCommand.class);
+ Message msg = mock(Message.class);
+ ServerConnection serverConnection = mock(ServerConnection.class);
+ ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
+ InternalDistributedMember client = mock(InternalDistributedMember.class);
+ TXManagerImpl txMgr = mock(TXManagerImpl.class);
+ InternalCache cache = Fakes.cache();
+ int uniqueId = 1;
+ TXId txId = new TXId(client, uniqueId);
+ TXStateProxyImpl proxy = new TXStateProxyImpl(txMgr, txId, null);
+ FindRemoteTXMessageReplyProcessor processor = mock(FindRemoteTXMessageReplyProcessor.class);
+ InternalDistributedMember host = mock(InternalDistributedMember.class);
+
+ doCallRealMethod().when(cmd).cmdExecute(msg, serverConnection, null, 1);
+ when(serverConnection.getProxyID()).thenReturn(clientProxyMembershipID);
+ when(clientProxyMembershipID.getDistributedMember()).thenReturn(client);
+ when(msg.getTransactionId()).thenReturn(uniqueId);
+ when(serverConnection.getCache()).thenReturn(cache);
+ when(cache.getCacheTransactionManager()).thenReturn(txMgr);
+ when(txMgr.getTXState()).thenReturn(proxy);
+ when(cmd.createTXId(client, uniqueId)).thenReturn(txId);
+ PowerMockito.mockStatic(FindRemoteTXMessage.class);
+ PowerMockito.when(FindRemoteTXMessage.send(cache, txId)).thenReturn(processor);
+ when(processor.getHostingMember()).thenReturn(host);
+ when(proxy.getCache()).thenReturn(cache);
+ when(cache.getDistributedSystem()).thenReturn(mock(DistributedSystem.class));
+
+ cmd.cmdExecute(msg, serverConnection, null, 1);
+ assertNotNull(proxy.getRealDeal(host));
+ assertEquals(proxy.getTarget(), host);
+ }
+}