You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:20 UTC
[02/51] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
index 7a306f0,0000000..720da56
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
@@@ -1,1527 -1,0 +1,1527 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test various distributed aspects of transactions,
+ * e.g. locking/reservation symantics that do not need multiple Region
+ * configurations. For those tests see
+ * <code>MultiVMRegionTestCase</code>.
+ *
+ *
+ * @author Mitch Thomas
+ * @since 4.0
+ * @see MultiVMRegionTestCase
+ *
+ */
+
+package com.gemstone.gemfire.cache30;
+
+
+//import com.gemstone.gemfire.*;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.Ignore;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.CommitIncompleteException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskAccessException;
+import com.gemstone.gemfire.cache.LoaderHelper;
+import com.gemstone.gemfire.cache.MirrorType;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.distributed.internal.ResourceEventsListener;
+import com.gemstone.gemfire.distributed.internal.locks.DLockBatch;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper;
+import com.gemstone.gemfire.internal.cache.CommitReplyException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXState;
+import com.gemstone.gemfire.internal.cache.TXStateInterface;
+import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
+//import com.gemstone.gemfire.internal.cache.locks.TXLockId;
+import com.gemstone.gemfire.internal.cache.locks.TXLockBatch;
+import com.gemstone.gemfire.internal.cache.locks.TXLockService;
+import com.gemstone.gemfire.internal.cache.locks.TXLockServiceImpl;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+public class TXDistributedDUnitTest extends CacheTestCase {
+ public TXDistributedDUnitTest(String name) {
+ super(name);
+ }
+
+ protected RegionAttributes getRegionAttributes() {
+ return this.getRegionAttributes(Scope.DISTRIBUTED_ACK);
+ }
+
+ protected RegionAttributes getRegionAttributes(Scope scope) {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(scope);
+ if (scope.isDistributedAck()) {
+ factory.setEarlyAck(false);
+ }
+ return factory.create();
+ }
+
+ /**
+ * Test a remote grantor
+ */
+ public void testRemoteGrantor() throws Exception {
+ IgnoredException.addIgnoredException("killing members ds");
+ final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
+ final String rgnName = getUniqueName();
+ Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
+ rgn.create("key", null);
+
+ Invoke.invokeInEveryVM(new SerializableRunnable("testRemoteGrantor: initial configuration") {
+ public void run() {
+ try {
+ Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
+ rgn1.put("key", "val0");
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ });
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+// VM vm1 = host.getVM(1);
+// VM vm2 = host.getVM(2);
+
+ vm0.invoke(new SerializableRunnable("testRemoteGrantor: remote grantor init") {
+ public void run() {
+ try {
+ Region rgn1 = getCache().getRegion(rgnName);
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ txMgr2.begin();
+ rgn1.put("key", "val1");
+ txMgr2.commit();
+ assertNotNull(TXLockService.getDTLS());
+ assertTrue(TXLockService.getDTLS().isLockGrantor());
+ } catch (CacheException e) {
+ fail("While performing first transaction");
+ }
+ }
+ });
+
+ // fix for bug 38843 causes the DTLS to be created in every TX participant
+ assertNotNull(TXLockService.getDTLS());
+ assertFalse(TXLockService.getDTLS().isLockGrantor());
+ assertEquals("val1", rgn.getEntry("key").getValue());
+
+ vm0.invoke(new SerializableRunnable("Disconnect from DS, remote grantor death") {
+ public void run() {
+ try {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ } finally {
+ // Allow getCache() to re-establish a ds connection
+ closeCache();
+ }
+ }
+ });
+
+ // Make this VM the remote Grantor
+ txMgr.begin();
+ rgn.put("key", "val2");
+ txMgr.commit();
+ assertNotNull(TXLockService.getDTLS());
+ assertTrue(TXLockService.getDTLS().isLockGrantor());
+
+ SerializableRunnable remoteComm =
+ new SerializableRunnable("testRemoteGrantor: remote grantor commit") {
+ public void run() {
+ try {
+ Cache c = getCache();
+ CacheTransactionManager txMgr2 = c.getCacheTransactionManager();
+ Region rgn1 = c.getRegion(rgnName);
+ if (rgn1 == null) {
+ // This block should only execute on VM0
+ rgn1 = c.createRegion(rgnName, getRegionAttributes());
+ }
+
+ txMgr2.begin();
+ rgn1.put("key", "val3");
+ txMgr2.commit();
+
+ if (TXLockService.getDTLS() != null) {
+ assertTrue(!TXLockService.getDTLS().isLockGrantor());
+ }
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ };
+ Invoke.invokeInEveryVM(remoteComm);
+ // vm1.invoke(remoteComm);
+ // vm2.invoke(remoteComm);
+
+ assertNotNull(TXLockService.getDTLS());
+ assertTrue(TXLockService.getDTLS().isLockGrantor());
+ assertEquals("val3", rgn.getEntry("key").getValue());
+ rgn.destroyRegion();
+ }
+
+ /**
+ * Test the internal callbacks used for what else... testing
+ */
+ public void testInternalCallbacks() throws Exception {
+ final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
+ final String rgnName1 = getUniqueName() + "_1";
+ final String rgnName2 = getUniqueName() + "_2";
+ final String rgnName3 = getUniqueName() + "_3";
+ Region rgn1 = getCache().createRegion(rgnName1, getRegionAttributes());
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ SerializableRunnable createRgn =
+ new SerializableRunnable("testInternalCallbacks: initial configuration") {
+ public void run() {
+ try {
+ Region rgn1a = getCache().createRegion(rgnName1, getRegionAttributes());
+ Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
+ Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
+ rgn1a.create("key", null);
+ rgn2.create("key", null);
+ rgn3.create("key", null);
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ };
+ vm0.invoke(createRgn);
+ vm1.invoke(createRgn);
+
+ // Standard commit check
+ txMgr.begin();
+ rgn1.put("key", "value0");
+ txMgr.commit();
+ SerializableRunnable checkRgn1 =
+ new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
+ public void run() {
+ Region rgn1a = getCache().getRegion(rgnName1);
+ assertNotNull(rgn1a);
+ assertEquals("value0", rgn1a.getEntry("key").getValue());
+ }
+ };
+ vm0.invoke(checkRgn1);
+ vm1.invoke(checkRgn1);
+
+ {
+ final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
+ txMgr.begin();
+ ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
+ setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
+ rgn1.put("key", "value1");
+ txMgr.commit();
+ for(int i=cbSensors.length-3; i>=0; --i) {
+ assertEquals("Internal callback " + i + " was not called the expected number of times!",
+ (byte) 1, cbSensors[i]);
+ }
+ for(int i=cbSensors.length-1; i>cbSensors.length-3; --i) {
+ assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!",
+ (byte) 2, cbSensors[i]);
+ }
+ }
+ SerializableRunnable checkRgn1Again =
+ new SerializableRunnable("testInternalCallbacks: validate remote values") {
+ public void run() {
+ Region rgn1a = getCache().getRegion(rgnName1);
+ assertNotNull(rgn1a);
+ assertEquals("value1", rgn1a.getEntry("key").getValue());
+ }
+ };
+ vm0.invoke(checkRgn1Again);
+ vm1.invoke(checkRgn1Again);
+
+ // Try 2 regions
+ Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
+ txMgr.begin();
+ rgn1.put("key", "value2");
+ rgn2.put("key", "value2");
+ txMgr.commit();
+ SerializableRunnable checkRgn12 =
+ new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
+ public void run() {
+ Region rgn1a = getCache().getRegion(rgnName1);
+ assertNotNull(rgn1a);
+ assertEquals("value2", rgn1a.getEntry("key").getValue());
+ Region rgn2a = getCache().getRegion(rgnName2);
+ assertNotNull(rgn2a);
+ assertEquals("value2", rgn2a.getEntry("key").getValue());
+ }
+ };
+ vm0.invoke(checkRgn12);
+ vm1.invoke(checkRgn12);
+
+ {
+ final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
+ txMgr.begin();
+ ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
+ setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
+ rgn1.put("key", "value3");
+ rgn2.put("key", "value3");
+ txMgr.commit();
+
+ for(int i=cbSensors.length-3; i>=0; i--) {
+ assertEquals("Internal callback " + i + " was not called the expected number of times!",
+ (byte) 1, cbSensors[i]);
+ }
+ for(int i=cbSensors.length-1; i> cbSensors.length-3; --i) {
+ assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!",
+ (byte) 2, cbSensors[i]);
+ }
+ }
+ SerializableRunnable checkRgn12Again =
+ new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
+ public void run() {
+ Region rgn1a = getCache().getRegion(rgnName1);
+ assertNotNull(rgn1a);
+ assertEquals("value3", rgn1a.getEntry("key").getValue());
+ Region rgn2a = getCache().getRegion(rgnName2);
+ assertNotNull(rgn2a);
+ assertEquals("value3", rgn2a.getEntry("key").getValue());
+ }
+ };
+ vm0.invoke(checkRgn12Again);
+ vm1.invoke(checkRgn12Again);
+
+ // Try a third region (D_NO_ACK)
+ Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
+ txMgr.begin();
+ rgn1.put("key", "value4");
+ rgn2.put("key", "value4");
+ rgn3.put("key", "value4");
+ txMgr.commit();
+ SerializableRunnable checkRgn123 =
+ new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
+ public void run() {
+ Region rgn1a = getCache().getRegion(rgnName1);
+ assertNotNull(rgn1a);
+ assertEquals("value4", rgn1a.getEntry("key").getValue());
+ Region rgn2a = getCache().getRegion(rgnName2);
+ assertNotNull(rgn2a);
+ assertEquals("value4", rgn2a.getEntry("key").getValue());
+ Region rgn3a = getCache().getRegion(rgnName3);
+ assertNotNull(rgn3a);
+ assertEquals("value4", rgn3a.getEntry("key").getValue());
+ }
+ };
+ vm0.invoke(checkRgn123);
+ vm1.invoke(checkRgn123);
+
+ {
+ final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
+ txMgr.begin();
+ ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
+ setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
+
+ rgn1.put("key", "value5");
+ rgn2.put("key", "value5");
+ rgn3.put("key", "value5");
+ txMgr.commit();
+
+ for(int i=cbSensors.length-3; i>=0; i--) {
+ assertEquals("Internal callback " + i + " was not called the expected number of times!",
+ (byte) 1, cbSensors[i]);
+ }
+ for(int i=cbSensors.length-1; i> cbSensors.length-3; --i) {
+ assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!",
+ (byte) 2, cbSensors[i]);
+ }
+ }
+ SerializableRunnable checkRgn123Again =
+ new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
+ public void run() {
+ Region rgn1a = getCache().getRegion(rgnName1);
+ assertNotNull(rgn1a);
+ assertEquals("value5", rgn1a.getEntry("key").getValue());
+ Region rgn2a = getCache().getRegion(rgnName2);
+ assertNotNull(rgn2a);
+ assertEquals("value5", rgn2a.getEntry("key").getValue());
+ Region rgn3a = getCache().getRegion(rgnName3);
+ assertNotNull(rgn3a);
+ assertEquals("value5", rgn3a.getEntry("key").getValue());
+ }
+ };
+ vm0.invoke(checkRgn123Again);
+ vm1.invoke(checkRgn123Again);
+
+ rgn1.destroyRegion();
+ rgn2.destroyRegion();
+ }
+ static final void setInternalCallbacks(TXStateInterface txp, final byte[] cbSensors) {
+ ((TXStateProxyImpl)txp).forceLocalBootstrap();
+ TXState tx = (TXState)((TXStateProxyImpl)txp).getRealDeal(null,null);
+ assertEquals(9, cbSensors.length);
+ tx.setAfterReservation(new Runnable() {
+ public void run() {
+ cbSensors[0]++;
+ }
+ });
+ tx.setAfterConflictCheck(new Runnable() {
+ public void run() {
+ cbSensors[1]++;
+ }
+ });
+ tx.setAfterApplyChanges(new Runnable() {
+ public void run() {
+ cbSensors[2]++;
+ }
+ });
+ tx.setAfterReleaseLocalLocks(new Runnable() {
+ public void run() {
+ cbSensors[3]++;
+ }
+ });
+ tx.setAfterIndividualSend(new Runnable() {
+ public void run() {
+ cbSensors[4]++;
+ }
+ });
+ tx.setAfterIndividualCommitProcess(new Runnable() {
+ public void run() {
+ cbSensors[5]++;
+ }
+ });
+ tx.setAfterSend(new Runnable() {
+ public void run() {
+ cbSensors[6]++;
+ }
+ });
+ tx.setDuringIndividualSend(new Runnable() {
+ public void run() {
+ cbSensors[7]++;
+ }
+ });
+ tx.setDuringIndividualCommitProcess(new Runnable() {
+ public void run() {
+ cbSensors[8]++;
+ }
+ });
+ }
+
+ /**
+ * Test distributed ack transactions that consist only of
+ * data from loaded values
+ */
+ public void testDACKLoadedMessage() throws Exception {
+ final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
+ final String rgnName = getUniqueName();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEarlyAck(false);
+ factory.setCacheLoader(new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ return "val" + helper.getArgument();
+ }
+ public void close() {}
+ });
+ Region rgn = getCache().createRegion(rgnName, factory.create());
+
+ Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: intial configuration") {
+ public void run() {
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ // factory.setDataPolicy(DataPolicy.REPLICATE);
+ factory2.setMirrorType(MirrorType.KEYS);
+ getCache().createRegion(rgnName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ });
+
+ // Confirm the standard case
+ txMgr.begin();
+ rgn.put("key1", "val1");
+ txMgr.commit();
+ assertEquals("val1", rgn.getEntry("key1").getValue());
+
+ Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
+ public void run() {
+ Region rgn1 = getCache().getRegion(rgnName);
+ assertEquals("val1", rgn1.getEntry("key1").getValue());
+ }
+ });
+
+ // Confirm loaded value case
+ txMgr.begin();
+ rgn.get("key2", new Integer(2));
+ txMgr.commit();
+ assertEquals("val2", rgn.getEntry("key2").getValue());
+
+ Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
+ public void run() {
+ Region rgn1 = getCache().getRegion(rgnName);
+ assertEquals("val2", rgn1.getEntry("key2").getValue());
+ }
+ });
+
+ // This should use the ack w/ the lockid
+ txMgr.begin();
+ rgn.put("key3", "val3");
+ rgn.get("key4", new Integer(4));
+ txMgr.commit();
+
+ Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
+ public void run() {
+ Region rgn1 = getCache().getRegion(rgnName);
+ assertEquals("val3", rgn1.getEntry("key3").getValue());
+ assertEquals("val4", rgn1.getEntry("key4").getValue());
+ }
+ });
+
+ }
+
+ @Override
+ public Properties getDistributedSystemProperties() {
+ Properties p = super.getDistributedSystemProperties();
+ p.put("log-level", LogWriterUtils.getDUnitLogLevel());
+ return p;
+ }
+
+ public void testHighAvailabilityFeatures() throws Exception {
+ IgnoredException.addIgnoredException("DistributedSystemDisconnectedException");
+// final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
+// final TXManagerImpl txMgrImpl = (TXManagerImpl) txMgr;
+ final String rgnName = getUniqueName();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEarlyAck(false);
+ Region rgn = getCache().createRegion(rgnName, factory.create());
+ Invoke.invokeInEveryVM(new SerializableRunnable("testHighAvailabilityFeatures: intial region configuration") {
+ public void run() {
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ getCache().createRegion(rgnName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ });
+
+ // create entries
+ rgn.put("key0", "val0_0");
+ rgn.put("key1", "val1_0");
+
+ Host host = Host.getHost(0);
+ // This test assumes that there are at least three VMs; the origin and two recipients
+ assertTrue(host.getVMCount() >= 3);
+ final VM originVM = host.getVM(0);
+
+ // Test that there is no commit after a partial commit message
+ // send (only sent to a minority of the recipients)
+ originVM.invoke(new SerializableRunnable("Flakey DuringIndividualSend Transaction") {
+ public void run() {
+ final Region rgn1 = getCache().getRegion(rgnName);
+ assertNotNull(rgn1);
+ try {
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ final CacheTransactionManager txMgrImpl = txMgr2;
+
+ txMgr2.begin();
+ // 1. setup an internal callback on originVM that will call
+ // disconnectFromDS() on the 2nd duringIndividualSend
+ // call.
+ ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
+ txState.setDuringIndividualSend(new Runnable() {
+ private int numCalled = 0;
+ public synchronized void run() {
+ ++numCalled;
+ rgn1.getCache().getLogger().info("setDuringIndividualSend Runnable called " + numCalled + " times");
+ if (numCalled > 1) {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ }
+ }
+ });
+ rgn1.put("key0", "val0_1");
+ rgn1.put("key1", "val1_1");
+ // 2. commit a transaction in originVM, it will disconnect from the DS
+ txMgr2.commit();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ rgn1.getCache().getLogger().warning("Ignoring Exception", e);
+ }
+ finally {
+ // Allow this VM to re-connect to the DS upon getCache() call
+ closeCache();
+ }
+ }
+ });
+ // 3. verify on all VMs that the transaction was not committed
+ final SerializableRunnable noChangeValidator =
+ new SerializableRunnable("testHighAvailabilityFeatures: validate no change in Region") {
+ public void run() {
+ Region rgn1 = getCache().getRegion(rgnName);
+ if (rgn1 == null) {
+ // Expect a null region from originVM
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ rgn1 = getCache().createRegion(rgnName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ Region.Entry re = rgn1.getEntry("key0");
+ assertNotNull(re);
+ assertEquals("val0_0", re.getValue());
+ re = rgn1.getEntry("key1");
+ assertNotNull(re);
+ assertEquals("val1_0", re.getValue());
+ }
+ };
+ Invoke.invokeInEveryVM(noChangeValidator);
+
+ // Test that there is no commit after sending to all recipients
+ // but prior to sending the "commit process" message
+ originVM.invoke(new SerializableRunnable("Flakey AfterIndividualSend Transaction") {
+ public void run() {
+ final Region rgn1 = getCache().getRegion(rgnName);
+ assertNotNull(rgn1);
+ try {
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ final CacheTransactionManager txMgrImpl = txMgr2;
+
+ txMgr2.begin();
+ // 1. setup an internal callback on originVM that will call
+ // disconnectFromDS() on AfterIndividualSend
+ ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
+ txState.setAfterIndividualSend(new Runnable() {
+ public synchronized void run() {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ }
+ });
+ rgn1.put("key0", "val0_2");
+ rgn1.put("key1", "val1_2");
+ // 2. commit a transaction in originVM, it will disconnect from the DS
+ txMgr2.commit();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ rgn1.getCache().getLogger().warning("Ignoring Exception", e);
+ }
+ finally {
+ // Allow this VM to re-connect to the DS upon getCache() call
+ closeCache();
+ }
+ }
+ });
+ // 3. verify on all VMs, including the origin, that the transaction was not committed
+ Invoke.invokeInEveryVM(noChangeValidator);
+
+ // Test commit success upon a single commit process message received.
+ originVM.invoke(new SerializableRunnable("Flakey DuringIndividualCommitProcess Transaction") {
+ public void run() {
+ final Region rgn1 = getCache().getRegion(rgnName);
+ assertNotNull(rgn1);
+ try {
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ final CacheTransactionManager txMgrImpl = txMgr2;
+
+ txMgr2.begin();
+
+ ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
+ // 1. setup an internal callback on originVM that will call
+ // disconnectFromDS() on the 2nd internalDuringIndividualCommitProcess
+ // call.
+ txState.setDuringIndividualCommitProcess(new Runnable() {
+ private int numCalled = 0;
+ public synchronized void run() {
+ ++numCalled;
+ rgn1.getCache().getLogger().info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
+ if (numCalled > 1) {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ }
+ }
+ });
+ rgn1.put("key0", "val0_3");
+ rgn1.put("key1", "val1_3");
+ // 2. commit a transaction in originVM, it will disconnect from the DS
+ txMgr2.commit();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ rgn1.getCache().getLogger().warning("Ignoring Exception", e);
+ }
+ finally {
+ // Allow this VM to re-connect to the DS upon getCache() call
+ closeCache();
+ }
+ }
+ });
+ // 3. verify on all VMs that the transaction was committed (including the orgin, due to GII)
+ SerializableRunnable nonSoloChangeValidator1 =
+ new SerializableRunnable("testHighAvailabilityFeatures: validate v1 non-solo Region changes") {
+ public void run() {
+ Region rgn1 = getCache().getRegion(rgnName);
+ if (rgn1 == null) {
+ // Expect a null region from originVM
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ rgn1 = getCache().createRegion(rgnName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ long giveUp = System.currentTimeMillis() + 10000;
+ while (giveUp > System.currentTimeMillis()) {
+ try {
+ Region.Entry re = rgn1.getEntry("key0");
+ assertNotNull(re);
+ assertEquals("val0_3", re.getValue());
+ re = rgn1.getEntry("key1");
+ assertNotNull(re);
+ assertEquals("val1_3", re.getValue());
+ break;
+ } catch (AssertionFailedError e) {
+ if (giveUp > System.currentTimeMillis()) {
+ throw e;
+ }
+ }
+ }
+ }
+ };
+ Invoke.invokeInEveryVM(nonSoloChangeValidator1);
+
+ // Verify successful solo region commit after duringIndividualSend
+ // (same as afterIndividualSend).
+ // Create a region that only exists on the origin and another VM
+ final String soloRegionName = getUniqueName() + "_solo";
+ SerializableRunnable createSoloRegion =
+ new SerializableRunnable("testHighAvailabilityFeatures: solo region configuration") {
+ public void run() {
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ Region rgn1 = getCache().createRegion(soloRegionName, factory2.create());
+ rgn1.put("soloKey0", "soloVal0_0");
+ rgn1.put("soloKey1", "soloVal1_0");
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ };
+ final VM soloRegionVM = host.getVM(1);
+ originVM.invoke(createSoloRegion);
+ soloRegionVM.invoke(createSoloRegion);
+ originVM.invoke(new SerializableRunnable("Flakey solo region DuringIndividualSend Transaction") {
+ public void run() {
+ final Region soloRgn = getCache().getRegion(soloRegionName);
+ assertNotNull(soloRgn);
+ try {
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ final CacheTransactionManager txMgrImpl = txMgr2;
+
+ txMgr2.begin();
+ // 1. setup an internal callback on originVM that will call
+ // disconnectFromDS() on the 2nd duringIndividualSend
+ // call.
+ ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
+ txState.setDuringIndividualSend(new Runnable() {
+ private int numCalled = 0;
+ public synchronized void run() {
+ ++numCalled;
+ soloRgn.getCache().getLogger().info("setDuringIndividualSend Runnable called " + numCalled + " times");
+ if (numCalled > 1) {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ }
+ }
+ });
+ soloRgn.put("soloKey0", "soloVal0_1");
+ soloRgn.put("soloKey1", "soloVal1_1");
+ // 2. commit a transaction in originVM, it will disconnect from the DS
+ txMgr2.commit();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ soloRgn.getCache().getLogger().warning("Ignoring Exception", e);
+ }
+ finally {
+ // Allow this VM to re-connect to the DS upon getCache() call
+ closeCache();
+ }
+ }
+ });
+ // 3. verify on the soloRegionVM that the transaction was committed
+ final SerializableRunnable soloRegionCommitValidator1 =
+ new SerializableRunnable("testHighAvailabilityFeatures: validate successful v1 commit in solo Region") {
+ public void run() {
+ Region soloRgn = getCache().getRegion(soloRegionName);
+ if (soloRgn == null) {
+ // Expect a null region from originVM
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ soloRgn = getCache().createRegion(soloRegionName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region ", e);
+ }
+ }
+ Region.Entry re = soloRgn.getEntry("soloKey0");
+ assertNotNull(re);
+ assertEquals("soloVal0_1", re.getValue());
+ re = soloRgn.getEntry("soloKey1");
+ assertNotNull(re);
+ assertEquals("soloVal1_1", re.getValue());
+ }
+ };
+ originVM.invoke(soloRegionCommitValidator1);
+ soloRegionVM.invoke(soloRegionCommitValidator1);
+ // verify no change in nonSolo region, re-establish region in originVM
+ Invoke.invokeInEveryVM(nonSoloChangeValidator1);
+
+ // Verify no commit for failed send (afterIndividualSend) for solo
+ // Region combined with non-solo Region
+ originVM.invoke(new SerializableRunnable("Flakey mixed (solo+non-solo) region DuringIndividualSend Transaction") {
+ public void run() {
+ final Region rgn1 = getCache().getRegion(rgnName);
+ assertNotNull(rgn1);
+ final Region soloRgn = getCache().getRegion(soloRegionName);
+ assertNotNull(soloRgn);
+ try {
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ final CacheTransactionManager txMgrImpl = txMgr2;
+
+ txMgr2.begin();
+ // 1. setup an internal callback on originVM that will call
+ // disconnectFromDS() on the afterIndividualSend
+ // call.
+ ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
+ txState.setAfterIndividualSend(new Runnable() {
+ public synchronized void run() {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ }
+ });
+
+ rgn1.put("key0", "val0_4");
+ rgn1.put("key1", "val1_4");
+ soloRgn.put("soloKey0", "soloVal0_2");
+ soloRgn.put("soloKey1", "soloVal1_2");
+
+ // 2. commit a transaction in originVM, it will disconnect from the DS
+ txMgr2.commit();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ rgn1.getCache().getLogger().warning("Ignoring Exception", e);
+ }
+ finally {
+ // Allow this VM to re-connect to the DS upon getCache() call
+ closeCache();
+ }
+ }
+ });
+ // Origin and Solo Region VM should be the same as last validation
+ originVM.invoke(soloRegionCommitValidator1);
+ soloRegionVM.invoke(soloRegionCommitValidator1);
+ Invoke.invokeInEveryVM(nonSoloChangeValidator1);
+
+ // Verify commit after sending a single
+ // (duringIndividualCommitProcess) commit process for solo Region
+ // combined with non-solo Region
+ originVM.invoke(new SerializableRunnable("Flakey mixed (solo+non-solo) region DuringIndividualCommitProcess Transaction") {
+ public void run() {
+ final Region rgn1 = getCache().getRegion(rgnName);
+ assertNotNull(rgn1);
+ final Region soloRgn = getCache().getRegion(soloRegionName);
+ assertNotNull(soloRgn);
+ try {
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ final CacheTransactionManager txMgrImpl = txMgr2;
+
+ txMgr2.begin();
+ // 1. setup an internal callback on originVM that will call
+ // disconnectFromDS() on the afterIndividualSend
+ // call.
+ ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
+ txState.setAfterIndividualSend(new Runnable() {
+ private int numCalled = 0;
+ public synchronized void run() {
+ ++numCalled;
+ rgn1.getCache().getLogger().info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
+ if (numCalled > 1) {
+ MembershipManagerHelper.crashDistributedSystem(getSystem());
+ }
+ }
+ });
+
+ rgn1.put("key0", "val0_5");
+ rgn1.put("key1", "val1_5");
+ soloRgn.put("soloKey0", "soloVal0_3");
+ soloRgn.put("soloKey1", "soloVal1_3");
+
+ // 2. commit a transaction in originVM, it will disconnect from the DS
+ txMgr2.commit();
+ }
+ catch (VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (Throwable e) {
+ rgn1.getCache().getLogger().warning("Ignoring Exception", e);
+ }
+ finally {
+ // Allow this VM to re-connect to the DS upon getCache() call
+ closeCache();
+ }
+ }
+ });
+ final SerializableRunnable soloRegionCommitValidator2 =
+ new SerializableRunnable("testHighAvailabilityFeatures: validate successful v2 commit in solo Region") {
+ public void run() {
+ Region soloRgn = getCache().getRegion(soloRegionName);
+ if (soloRgn == null) {
+ // Expect a null region from originVM
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ soloRgn = getCache().createRegion(soloRegionName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region ", e);
+ }
+ }
+ Region.Entry re = soloRgn.getEntry("soloKey0");
+ assertNotNull(re);
+ assertEquals("soloVal0_3", re.getValue());
+ re = soloRgn.getEntry("soloKey1");
+ assertNotNull(re);
+ assertEquals("soloVal1_3", re.getValue());
+ }
+ };
+ originVM.invoke(soloRegionCommitValidator2);
+ soloRegionVM.invoke(soloRegionCommitValidator2);
+ SerializableRunnable nonSoloChangeValidator2 =
+ new SerializableRunnable("testHighAvailabilityFeatures: validate v2 non-solo Region changes") {
+ public void run() {
+ Region rgn1 = getCache().getRegion(rgnName);
+ if (rgn1 == null) {
+ // Expect a null region from originVM
+ try {
+ AttributesFactory factory2 = new AttributesFactory();
+ factory2.setScope(Scope.DISTRIBUTED_ACK);
+ factory2.setEarlyAck(false);
+ factory2.setDataPolicy(DataPolicy.REPLICATE);
+ rgn1 = getCache().createRegion(rgnName, factory2.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ Region.Entry re = rgn1.getEntry("key0");
+ assertNotNull(re);
+ assertEquals("val0_5", re.getValue());
+ re = rgn1.getEntry("key1");
+ assertNotNull(re);
+ assertEquals("val1_5", re.getValue());
+ }
+ };
+ Invoke.invokeInEveryVM(nonSoloChangeValidator2);
+ }
+
+ /**
+ * A class used in testLockBatchParticipantsUpdate to pause a transaction in the
+ * afterResrvation and afterSend states.
+ */
+ static public class PausibleTX implements Runnable {
+ public boolean isRunning = false;
+ public String rgnName = null;
+ public Cache myCache = null;
+ public Object key = null;
+ public Object value = null;
+
+ public boolean getIsRunning() {
+ return this.isRunning;
+ }
+
+ public void run() {
+ Region rgn = this.myCache.getRegion(this.rgnName);
+ final CacheTransactionManager txMgr = this.myCache.getCacheTransactionManager();
+ txMgr.begin();
+ ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
+ TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).getRealDeal(null,null);
+ txState.setAfterReservation(new Runnable() {
+ public void run() {
+ try {
+ synchronized (PausibleTX.class) {
+ PausibleTX.this.isRunning = true;
+ // Notify the thread that created this, that we are ready
+ PausibleTX.class.notifyAll();
+ // Wait for the controller to start a GII and let us proceed
+ PausibleTX.class.wait();
+ }
+ }
+ catch (InterruptedException ie) {
+// PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
+ fail("interrupted");
+ }
+ }
+ });
+ txState.setAfterSend(new Runnable() {
+ public void run() {
+ try {
+ synchronized (PausibleTX.class) {
+ // Notify the controller that we have sent the TX data (and the
+ // update)
+ PausibleTX.class.notifyAll();
+ // Wait until the controller has determined in fact the update
+ // took place
+ PausibleTX.class.wait();
+ }
+ }
+ catch (InterruptedException ie) {
+// PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
+ fail("interrupted");
+ }
+ }
+ });
+ try {
+ rgn.put(key, value);
+ txMgr.commit();
+ } catch (CommitConflictException cce) {
+ fail("Did not expect commit conflict exception when sending updates to new members in PausibleTX" + cce);
+// } catch (CacheException ce) {
+// fail("Did not expect cache exception when sending updates to new members in PausibleTX" + ce);
+ }
+ }
+ }
+
+ /**
+ * Returns the GemFire system ID of the VM on which this method is run
+ */
+ public static Serializable getSystemId() {
+ Serializable ret = null;
+ if (DistributedTestCase.system != null) {
+ ret = DistributedTestCase.system.getDistributionManager().getId();
+ }
+ return ret;
+ }
+ static HashSet preTXSystemIds;
+ public static void setPreTXSystemIds(HashSet ids) {
+ TXDistributedDUnitTest.preTXSystemIds = ids;
+ }
+ static HashSet postTXSystemIds;
+ public static void setPostTXSystemIds(HashSet ids) {
+ TXDistributedDUnitTest.postTXSystemIds = ids;
+ }
+ static Serializable txHostId;
+ public static void setTXHostSystemId(Serializable id) {
+ TXDistributedDUnitTest.txHostId = id;
+ }
+ /**
+ * Test update of lock batch participants (needed when new members are
+ * discovered between a commit's locking phase and the applicatoin of the
+ * Region's data. See bug 32999
+ */
+ public void testLockBatchParticipantsUpdate() throws Exception {
+// final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
+ final String rgnName = getUniqueName();
+ Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
+ rgn.create("key", null);
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ SerializableRunnable initRegions =
+ new SerializableRunnable("testLockBatchParticipantsUpdate: initial configuration") {
+ public void run() {
+ try {
+ Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
+ rgn1.create("key", null);
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ };
+ vm0.invoke(initRegions);
+ vm1.invoke(initRegions);
+ rgn.put("key", "val1");
+
+ // Connect vm2 also since it may have been shutdown when logPerTest
+ // is turned on
+ vm2.invoke(new SerializableRunnable("connect vm2 if not connected") {
+ public void run() {
+ getCache();
+ }
+ });
+
+ // Make VM0 the Grantor
+ vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: remote grantor init") {
+ public void run() {
+ try {
+ Region rgn1 = getCache().getRegion(rgnName);
+ final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
+ assertEquals("val1", rgn1.getEntry("key").getValue());
+ txMgr2.begin();
+ rgn1.put("key", "val2");
+ txMgr2.commit();
+ assertNotNull(TXLockService.getDTLS());
+ assertTrue(TXLockService.getDTLS().isLockGrantor());
+ } catch (CacheException e) {
+ fail("While performing first transaction");
+ }
+ }
+ });
+
+ // fix for bug 38843 causes the DTLS to be created in every TX participant
+ assertNotNull(TXLockService.getDTLS());
+ assertFalse(TXLockService.getDTLS().isLockGrantor());
+ assertEquals("val2", rgn.getEntry("key").getValue());
+
+ // Build sets of System Ids and set them up on VM0 for future batch member checks
+ HashSet txMembers = new HashSet(4);
+ txMembers.add(getSystemId());
- txMembers.add(vm0.invoke(TXDistributedDUnitTest.class, "getSystemId"));
- vm0.invoke(TXDistributedDUnitTest.class, "setPreTXSystemIds", new Object[] {txMembers});
- txMembers.add(vm2.invoke(TXDistributedDUnitTest.class, "getSystemId"));
- vm0.invoke(TXDistributedDUnitTest.class, "setPostTXSystemIds", new Object[] {txMembers});
++ txMembers.add(vm0.invoke(() -> TXDistributedDUnitTest.getSystemId()));
++ vm0.invoke(() -> TXDistributedDUnitTest.setPreTXSystemIds(txMembers));
++ txMembers.add(vm2.invoke(() -> TXDistributedDUnitTest.getSystemId()));
++ vm0.invoke(() -> TXDistributedDUnitTest.setPostTXSystemIds(txMembers));
+
+ // Don't include the tx host in the batch member set(s)
- Serializable vm1HostId = (Serializable) vm1.invoke(TXDistributedDUnitTest.class, "getSystemId");
- vm0.invoke(TXDistributedDUnitTest.class, "setTXHostSystemId", new Object[] {vm1HostId});
++ Serializable vm1HostId = (Serializable) vm1.invoke(() -> TXDistributedDUnitTest.getSystemId());
++ vm0.invoke(() -> TXDistributedDUnitTest.setTXHostSystemId(vm1HostId));
+
+ // Create a TX on VM1 (such that it will ask for locks on VM0) that uses the callbacks
+ // to pause and give us time to start a GII process on another VM
+ vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: slow tx (one that detects new member)") {
+ public void run() {
+ // fix for bug 38843 causes the DTLS to be created in every TX participant
+ assertNotNull(TXLockService.getDTLS());
+ assertFalse(TXLockService.getDTLS().isLockGrantor());
+
+ PausibleTX pauseTXRunnable = new PausibleTX();
+ pauseTXRunnable.rgnName = rgnName;
+ pauseTXRunnable.myCache = getCache();
+ pauseTXRunnable.key = "key";
+ pauseTXRunnable.value = "val3";
+ new Thread(pauseTXRunnable, "PausibleTX Thread").start();
+ synchronized(PausibleTX.class) {
+ while(!pauseTXRunnable.getIsRunning()) {
+ try {
+ PausibleTX.class.wait();
+ }
+ catch (InterruptedException ie) {
+ fail("Did not expect " + ie);
+ }
+ }
+ }
+ }
+ });
+
+ // Verify that the lock batch exists VM0 and has the size we expect
+ vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Verify lock batch exists on VM0 with expected size") {
+ public void run() {
+ getCache().getRegion(rgnName);
+ TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
+ assertNotNull(dtls);
+ assertTrue(dtls.isLockGrantor());
+ DLockService dLockSvc = dtls.getInternalDistributedLockService();
+ assertNotNull(TXDistributedDUnitTest.txHostId);
+ DLockBatch[] batches = dLockSvc.getGrantor().getLockBatches((InternalDistributedMember)TXDistributedDUnitTest.txHostId);
+ assertEquals(batches.length, 1);
+ TXLockBatch txLockBatch = (TXLockBatch) batches[0];
+ assertNotNull(txLockBatch);
+ assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
+ assertTrue("Members in lock batch " + txLockBatch.getParticipants() + " not the same as " + TXDistributedDUnitTest.preTXSystemIds,
+ txLockBatch.getParticipants().equals(TXDistributedDUnitTest.preTXSystemIds));
+ }
+ });
+
+ // Start a GII process on VM2
+ vm2.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: start GII") {
+ public void run() {
+ try {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setEarlyAck(false);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ getCache().createRegion(rgnName, factory.create());
+ } catch (CacheException e) {
+ Assert.fail("While creating region", e);
+ }
+ }
+ });
+
+ // Notify TX on VM1 so that it can continue
+ vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to continue") {
+ public void run() {
+ synchronized(PausibleTX.class) {
+ // Notify VM1 that it should proceed to the TX send
+ PausibleTX.class.notifyAll();
+ // Wait until VM1 has sent the TX
+ try {
+ PausibleTX.class.wait();
+ }
+ catch (InterruptedException ie) {
+ fail("Did not expect " + ie);
+ }
+ }
+ }
+ });
+
+ // Verify that the batch on VM0 has added VM2 into the set
+ vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Verify lock batch contains VM2") {
+ public void run() {
+ getCache().getRegion(rgnName);
+ TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
+ assertNotNull(dtls);
+ assertTrue(dtls.isLockGrantor());
+ DLockService dLockSvc = dtls.getInternalDistributedLockService();
+ assertNotNull(TXDistributedDUnitTest.txHostId);
+ DLockBatch[] batches = dLockSvc.getGrantor().getLockBatches((InternalDistributedMember)TXDistributedDUnitTest.txHostId);
+ assertEquals(batches.length, 1);
+ TXLockBatch txLockBatch = (TXLockBatch) batches[0];
+ assertNotNull(txLockBatch);
+ assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
+ assertTrue("Members in lock batch " + txLockBatch.getParticipants() + " not the same as " + TXDistributedDUnitTest.postTXSystemIds,
+ txLockBatch.getParticipants().equals(TXDistributedDUnitTest.postTXSystemIds));
+ }
+ });
+ // fix for bug 38843 causes the DTLS to be created in every TX participant
+ assertNotNull(TXLockService.getDTLS());
+ assertFalse(TXLockService.getDTLS().isLockGrantor());
+ assertEquals("val3", rgn.getEntry("key").getValue());
+
+
+ // Notify TX on VM1 that it can go ahead and complete the TX
+ vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to finish") {
+ public void run() {
+ synchronized(PausibleTX.class) {
+ // Notify VM1 that it should finish the TX
+ PausibleTX.class.notifyAll();
+ }
+ }
+ });
+
+
+ rgn.destroyRegion();
+ }
+
+ /**
+ * Hitachi bug 38809: Applying an exception to a remote VM fails due to
+ * an IOException on a Region configured for LRU Overflow
+ */
+ public static final String TROUBLE_KEY = "GoBoom";
+ public static class TXTroubleMaker implements LocalRegion.TestCallable {
+ // private final Region r;
+ public void call(LocalRegion r, Operation op, RegionEntry re) {
+ if (TROUBLE_KEY.equals(re.getKey())) {
+ throw new DiskAccessException(TROUBLE_KEY, r);
+ }
+ }
+ }
+
+ public static class ShutdownListener implements ResourceEventsListener {
+ CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public void handleEvent(ResourceEvent event, Object resource) {
+ if (event.equals(ResourceEvent.CACHE_REMOVE)) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ public void unblockShutdown() {
+ this.latch.countDown();
+ }
+ }
+
+ @Ignore("Disabled for 51260")
+ public void DISABLED_testRemoteCommitFailure() {
+ try {
+ disconnectAllFromDS();
+ final String rgnName1= getUniqueName() + "_1";
+ final String rgnName2 = getUniqueName() + "_2";
+ final String diskStoreName = getUniqueName() + "_store";
+ Host host = Host.getHost(0);
+ VM origin = host.getVM(0);
+ VM trouble1 = host.getVM(1);
+ VM trouble2 = host.getVM(2);
+ VM noTrouble = host.getVM(3);
+ CacheSerializableRunnable initRegions =
+ new CacheSerializableRunnable("Initialize no trouble regions") {
+ @Override
+ public void run2() {
+ getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
+ TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setDiskStoreName(diskStoreName);
+ getCache().createRegion(rgnName1, af.create());
+ getCache().createRegion(rgnName2, af.create());
+ }
+ };
+ origin.invoke(initRegions);
+ noTrouble.invoke(initRegions);
+ SerializableRunnable initTroulbeRegions =
+ new CacheSerializableRunnable("Initialize regions that cause trouble") {
+ @Override
+ public void run2() {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ InternalRegionArguments ira = new InternalRegionArguments().setTestCallable(new TXTroubleMaker());
+ try {
+ getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
+ TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setDiskStoreName(diskStoreName);
+ gfc.createVMRegion(rgnName1, af.create(), ira);
+ gfc.createVMRegion(rgnName2, af.create(), ira);
+ gfc.getDistributedSystem().addResourceListener(new ShutdownListener());
+ } catch (IOException ioe) {
+ fail(ioe.toString());
+ }
+ catch (TimeoutException e) {
+ fail(e.toString());
+ }
+ catch (ClassNotFoundException e) {
+ fail(e.toString());
+ }
+ }
+ };
+ trouble1.invoke(initTroulbeRegions);
+ trouble2.invoke(initTroulbeRegions);
+
+ SerializableRunnable doTransaction =
+ new CacheSerializableRunnable("Run failing transaction") {
+ @Override
+ public void run2() {
+ Cache c = getCache();
+ Region r1 = c.getRegion(rgnName1);
+ assertNotNull(r1);
+ Region r2 = c.getRegion(rgnName2);
+ assertNotNull(r2);
+ CacheTransactionManager txmgr = c.getCacheTransactionManager();
+ txmgr.begin();
+ r1.put("k1", "k1");
+ r1.put("k2", "k2");
+ r1.put(TROUBLE_KEY, TROUBLE_KEY);
+ r2.put("k1", "k1");
+ r2.put("k2", "k2");
+ r2.put(TROUBLE_KEY, TROUBLE_KEY);
+ try {
+ txmgr.commit();
+ fail("Expected an tx incomplete exception");
+ } catch (CommitIncompleteException yay) {
+ String msg = yay.getMessage();
+// getLogWriter().info("failing exception", yay);
+ // Each region on a trouble VM should be mentioned (two regions per trouble VM)
+ int ind=0, match=0;
+ while((ind = msg.indexOf(rgnName1, ind)) >= 0) {
+ ind++; match++;
+ }
+ assertEquals(2, match);
+ ind=match=0;
+ while((ind = msg.indexOf(rgnName2, ind)) >= 0) {
+ ind++; match++;
+ }
+ assertEquals(2, match);
+ // DiskAccessExcpetions should be mentioned four times
+ ind=match=0;
+ while((ind = msg.indexOf(DiskAccessException.class.getName(), ind)) >= 0) {
+ ind++; match++;
+ }
+ assertEquals(4, match);
+ }
+ }
+ };
+ IgnoredException ee = null;
+ try {
+ ee = IgnoredException.addIgnoredException(DiskAccessException.class.getName() + "|" +
+ CommitIncompleteException.class.getName() + "|" +
+ CommitReplyException.class.getName());
+ origin.invoke(doTransaction);
+ } finally {
+ if (ee!=null) ee.remove();
+ }
+
+ SerializableCallable allowCacheToShutdown = new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+ List<ResourceEventsListener> listeners = cache.getDistributedSystem().getResourceListeners();
+ for (ResourceEventsListener l : listeners) {
+ if (l instanceof ShutdownListener) {
+ ShutdownListener shutListener = (ShutdownListener) l;
+ shutListener.unblockShutdown();
+ }
+ }
+ return null;
+ }
+ };
+ trouble1.invoke(allowCacheToShutdown);
+ trouble2.invoke(allowCacheToShutdown);
+
+ // Assert proper content on failing VMs
+ SerializableRunnable assertTroubledContent =
+ new CacheSerializableRunnable("Assert partail commit data") {
+ @Override
+ public void run2() {
+ final Cache c = getCache();
+ Wait.waitForCriterion(new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return c.getRegion(rgnName1) == null;
+ }
+
+ @Override
+ public String description() {
+ return null;
+ }
+ }, 30000, 1000, true);
+ Region r2 = c.getRegion(rgnName2);
+ assertNull(r2);
+ }
+ };
+ trouble1.invoke(assertTroubledContent);
+ trouble2.invoke(assertTroubledContent);
+
+ // Assert proper content on successful VMs
+ SerializableRunnable assertSuccessfulContent =
+ new CacheSerializableRunnable("Assert complete commit of data on successful VMs") {
+ @Override
+ public void run2() {
+ Cache c = getCache();
+ {
+ Region r1 = c.getRegion(rgnName1);
+ assertNotNull(r1);
+ assertEquals("k1", r1.getEntry("k1").getValue());
+ assertEquals("k2", r1.getEntry("k2").getValue());
+ assertEquals(TROUBLE_KEY, r1.getEntry(TROUBLE_KEY).getValue());
+ }
+ {
+ Region r2 = c.getRegion(rgnName2);
+ assertNotNull(r2);
+ assertEquals("k1", r2.getEntry("k1").getValue());
+ assertEquals("k2", r2.getEntry("k2").getValue());
+ assertEquals(TROUBLE_KEY, r2.getEntry(TROUBLE_KEY).getValue());
+ }
+ }
+ };
+ noTrouble.invoke(assertSuccessfulContent);
+
+ // Assert no content on originating VM
+ SerializableRunnable assertNoContent =
+ new CacheSerializableRunnable("Assert data survives on origin VM") {
+ @Override
+ public void run2() {
+ Cache c = getCache();
+ {
+ Region r1 = c.getRegion(rgnName1);
+ assertNotNull(r1);
+ assertNotNull(r1.getEntry("k1"));
+ assertNotNull(r1.getEntry("k2"));
+ assertNotNull(r1.getEntry(TROUBLE_KEY));
+ }
+ {
+ Region r2 = c.getRegion(rgnName2);
+ assertNotNull(r2);
+ assertNotNull(r2.getEntry("k1"));
+ assertNotNull(r2.getEntry("k2"));
+ assertNotNull(r2.getEntry(TROUBLE_KEY));
+ }
+ }
+ };
+ origin.invoke(assertNoContent);
+ } finally {
+ Invoke.invokeInEveryVM(new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
+ return null;
+ }
+ });
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
index a253f09,0000000..b1d7bab
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
@@@ -1,434 -1,0 +1,434 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.naming.Context;
+import javax.transaction.UserTransaction;
+
+import com.gemstone.gemfire.CopyHelper;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheEvent;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.LoaderHelper;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.TransactionEvent;
+import com.gemstone.gemfire.cache.TransactionListener;
+import com.gemstone.gemfire.cache.query.IndexType;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.transaction.Person;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.util.TransactionListenerAdapter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * Test the order of operations done on the farside of a tx.
+ *
+ * @author darrel
+ * @since 5.0
+ */
+public class TXOrderDUnitTest extends CacheTestCase {
+
+ private transient Region r;
+ private transient DistributedMember otherId;
+ protected transient int invokeCount;
+
+ public TXOrderDUnitTest(String name) {
+ super(name);
+ }
+
+ private VM getOtherVm() {
+ Host host = Host.getHost(0);
+ return host.getVM(0);
+ }
+
+ private void initOtherId() {
+ VM vm = getOtherVm();
+ vm.invoke(new CacheSerializableRunnable("Connect") {
+ public void run2() throws CacheException {
+ getCache();
+ }
+ });
- this.otherId = (DistributedMember)vm.invoke(TXOrderDUnitTest.class, "getVMDistributedMember");
++ this.otherId = (DistributedMember)vm.invoke(() -> TXOrderDUnitTest.getVMDistributedMember());
+ }
+ private void doCommitOtherVm() {
+ VM vm = getOtherVm();
+ vm.invoke(new CacheSerializableRunnable("create root") {
+ public void run2() throws CacheException {
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ Region r1 = createRootRegion("r1", af.create());
+ Region r2 = r1.createSubregion("r2", af.create());
+ Region r3 = r2.createSubregion("r3", af.create());
+ CacheTransactionManager ctm = getCache().getCacheTransactionManager();
+ ctm.begin();
+ r2.put("b", "value1");
+ r3.put("c", "value2");
+ r1.put("a", "value3");
+ r1.put("a2", "value4");
+ r3.put("c2", "value5");
+ r2.put("b2", "value6");
+ ctm.commit();
+ }
+ });
+ }
+
+ public static DistributedMember getVMDistributedMember() {
+ return InternalDistributedSystem.getAnyInstance().getDistributedMember();
+ }
+
+ ////////////////////// Test Methods //////////////////////
+
+ List expectedKeys;
+ int clCount = 0;
+
+ Object getCurrentExpectedKey() {
+ Object result = this.expectedKeys.get(this.clCount);
+ this.clCount += 1;
+ return result;
+ }
+ /**
+ * make sure listeners get invoked in correct order on far side of tx
+ */
+ public void testFarSideOrder() throws CacheException {
+ initOtherId();
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ CacheListener cl1 = new CacheListenerAdapter() {
+ public void afterCreate(EntryEvent e) {
+ assertEquals(getCurrentExpectedKey(), e.getKey());
+ }
+ };
+ af.addCacheListener(cl1);
+ Region r1 = createRootRegion("r1", af.create());
+ Region r2 = r1.createSubregion("r2", af.create());
+ r2.createSubregion("r3", af.create());
+
+ TransactionListener tl1 = new TransactionListenerAdapter() {
+ public void afterCommit(TransactionEvent e) {
+ assertEquals(6, e.getEvents().size());
+ ArrayList keys = new ArrayList();
+ Iterator it = e.getEvents().iterator();
+ while (it.hasNext()) {
+ EntryEvent ee = (EntryEvent)it.next();
+ keys.add(ee.getKey());
+ assertEquals(null, ee.getCallbackArgument());
+ assertEquals(true, ee.isCallbackArgumentAvailable());
+ }
+ assertEquals(TXOrderDUnitTest.this.expectedKeys, keys);
+ TXOrderDUnitTest.this.invokeCount = 1;
+ }
+ };
+ CacheTransactionManager ctm = getCache().getCacheTransactionManager();
+ ctm.addListener(tl1);
+
+ this.invokeCount = 0;
+ this.clCount = 0;
+ this.expectedKeys = Arrays.asList(new String[]{"b", "c", "a", "a2", "c2", "b2"});
+ doCommitOtherVm();
+ assertEquals(1, this.invokeCount);
+ assertEquals(6, this.clCount);
+ }
+
+ /**
+ * test bug#40870
+ * @throws Exception
+ */
+ public void _testFarSideOpForLoad() throws Exception {
+ Host host = Host.getHost(0);
+ VM vm1 = host.getVM(0);
+ VM vm2 = host.getVM(1);
+ vm1.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ CacheListener cl1 = new CacheListenerAdapter() {
+ public void afterCreate(EntryEvent e) {
+ assertTrue(e.getOperation().isLocalLoad());
+ }
+ };
+ af.addCacheListener(cl1);
+ CacheLoader cl = new CacheLoader() {
+ public Object load(LoaderHelper helper) throws CacheLoaderException {
+ LogWriterUtils.getLogWriter().info("Loading value:"+helper.getKey()+"_value");
+ return helper.getKey()+"_value";
+ }
+ public void close() {
+ }
+ };
+ af.setCacheLoader(cl);
+ createRootRegion("r1", af.create());
+ return null;
+ }
+ });
+
+ vm2.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ CacheListener cl1 = new CacheListenerAdapter() {
+ public void afterCreate(EntryEvent e) {
+ LogWriterUtils.getLogWriter().info("op:"+e.getOperation().toString());
+ assertTrue(!e.getOperation().isLocalLoad());
+ }
+ };
+ af.addCacheListener(cl1);
+ createRootRegion("r1", af.create());
+ return null;
+ }
+ });
+
+ vm1.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ Region r = getRootRegion("r1");
+ getCache().getCacheTransactionManager().begin();
+ r.get("obj_2");
+ getCache().getCacheTransactionManager().commit();
+ return null;
+ }
+ });
+ }
+
+ public void testInternalRegionNotExposed() {
+ Host host = Host.getHost(0);
+ VM vm1 = host.getVM(0);
+ VM vm2 = host.getVM(1);
+ SerializableCallable createRegion = new SerializableCallable() {
+ public Object call() throws Exception {
+ ExposedRegionTransactionListener tl = new ExposedRegionTransactionListener();
+ CacheTransactionManager ctm = getCache().getCacheTransactionManager();
+ ctm.addListener(tl);
+ ExposedRegionCacheListener cl = new ExposedRegionCacheListener();
+ AttributesFactory af = new AttributesFactory();
+ PartitionAttributes pa = new PartitionAttributesFactory()
+ .setRedundantCopies(1)
+ .setTotalNumBuckets(1)
+ .create();
+ af.setPartitionAttributes(pa);
+ af.addCacheListener(cl);
+ Region pr = createRootRegion("testTxEventForRegion", af.create());
+ return null;
+ }
+ };
+ vm1.invoke(createRegion);
+ vm2.invoke(createRegion);
+ vm1.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ Region pr = getRootRegion("testTxEventForRegion");
+ CacheTransactionManager ctm = getCache().getCacheTransactionManager();
+ pr.put(2, "tw");
+ pr.put(3, "three");
+ pr.put(4, "four");
+ ctm.begin();
+ pr.put(1, "one");
+ pr.put(2, "two");
+ pr.invalidate(3);
+ pr.destroy(4);
+ ctm.commit();
+ return null;
+ }
+ });
+ SerializableCallable verifyListener = new SerializableCallable() {
+ public Object call() throws Exception {
+ Region pr = getRootRegion("testTxEventForRegion");
+ CacheTransactionManager ctm = getCache().getCacheTransactionManager();
+ ExposedRegionTransactionListener tl = (ExposedRegionTransactionListener)ctm.getListeners()[0];
+ ExposedRegionCacheListener cl = (ExposedRegionCacheListener)pr.getAttributes().getCacheListeners()[0];
+ assertFalse(tl.exceptionOccurred);
+ assertFalse(cl.exceptionOccurred);
+ return null;
+ }
+ };
+ vm1.invoke(verifyListener);
+ vm2.invoke(verifyListener);
+ }
+
+ class ExposedRegionTransactionListener extends TransactionListenerAdapter {
+ private boolean exceptionOccurred = false;
+ @Override
+ public void afterCommit(TransactionEvent event) {
+ List<CacheEvent<?, ?>> events = event.getEvents();
+ for (CacheEvent<?, ?>e : events) {
+ if (!"/testTxEventForRegion".equals(e.getRegion().getFullPath())) {
+ exceptionOccurred = true;
+ }
+ }
+ }
+ }
+ class ExposedRegionCacheListener extends CacheListenerAdapter {
+ private boolean exceptionOccurred = false;
+ @Override
+ public void afterCreate(EntryEvent event) {
+ verifyRegion(event);
+ }
+ @Override
+ public void afterUpdate(EntryEvent event) {
+ verifyRegion(event);
+ }
+ private void verifyRegion(EntryEvent event) {
+ if (!"/testTxEventForRegion".equals(event.getRegion().getFullPath())) {
+ exceptionOccurred = true;
+ }
+ }
+ }
+
+ private final int TEST_PUT = 0;
+ private final int TEST_INVALIDATE = 1;
+ private final int TEST_DESTROY = 2;
+ /**
+ * verify that queries on indexes work with transaction
+ * @throws Exception
+ */
+ public void testFarSideIndexOnPut() throws Exception {
+ doTest(TEST_PUT);
+ }
+
+ public void testFarSideIndexOnInvalidate() throws Exception {
+ doTest(TEST_INVALIDATE);
+ }
+
+ public void testFarSideIndexOnDestroy() throws Exception {
+ doTest(TEST_DESTROY);
+ }
+
+ private void doTest(final int op) throws Exception {
+ Host host = Host.getHost(0);
+ VM vm1 = host.getVM(0);
+ VM vm2 = host.getVM(1);
+ SerializableCallable createRegionAndIndex = new SerializableCallable() {
+ public Object call() throws Exception {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ Region region = createRootRegion("sample", af.create());
+ QueryService qs = getCache().getQueryService();
+ qs.createIndex("foo", IndexType.FUNCTIONAL, "age", "/sample");
+ return null;
+ }
+ };
+ vm1.invoke(createRegionAndIndex);
+ vm2.invoke(createRegionAndIndex);
+
+ //do transactional puts in vm1
+ vm1.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ Context ctx = getCache().getJNDIContext();
+ UserTransaction utx = (UserTransaction)ctx.lookup("java:/UserTransaction");
+ Region region = getRootRegion("sample");
+ Integer x = new Integer(0);
+ utx.begin();
+ region.create(x, new Person("xyz", 45));
+ utx.commit();
+ QueryService qs = getCache().getQueryService();
+ Query q = qs.newQuery("select * from /sample where age < 50");
+ assertEquals(1, ((SelectResults)q.execute()).size());
+ Person dsample = (Person)CopyHelper.copy(region.get(x));
+ dsample.setAge(55);
+ utx.begin();
+ switch (op) {
+ case TEST_PUT:
+ region.put(x, dsample);
+ break;
+ case TEST_INVALIDATE:
+ region.invalidate(x);
+ break;
+ case TEST_DESTROY:
+ region.destroy(x);
+ break;
+ default:
+ fail("unknown op");
+ }
+ utx.commit();
+ assertEquals(0, ((SelectResults)q.execute()).size());
+ return null;
+ }
+ });
+
+ //run query and verify results in other vm
+ vm2.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ QueryService qs = getCache().getQueryService();
+ Query q = qs.newQuery("select * from /sample where age < 50");
+ assertEquals(0, ((SelectResults)q.execute()).size());
+ return null;
+ }
+ });
+ }
+
+ public void testBug43353() {
+ Host host = Host.getHost(0);
+ VM vm1 = host.getVM(0);
+ VM vm2 = host.getVM(1);
+
+ SerializableCallable createRegion = new SerializableCallable() {
+ public Object call() throws Exception {
+ getCache().createRegionFactory(RegionShortcut.REPLICATE).create(getTestMethodName());
+ return null;
+ }
+ };
+
+ vm1.invoke(createRegion);
+ vm2.invoke(createRegion);
+
+ vm1.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ Region r = getCache().getRegion(getTestMethodName());
+ r.put("ikey", "value");
+ getCache().getCacheTransactionManager().begin();
+ r.put("key1", new byte[20]);
+ r.invalidate("ikey");
+ getCache().getCacheTransactionManager().commit();
+ return null;
+ }
+ });
+
+ vm2.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ Region r = getCache().getRegion(getTestMethodName());
+ Object v = r.get("key1");
+ assertNotNull(v);
+ assertTrue(v instanceof byte[]);
+ assertNull(r.get("ikey"));
+ return null;
+ }
+ });
+ }
+}