You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:20 UTC

[02/51] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
index 7a306f0,0000000..720da56
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXDistributedDUnitTest.java
@@@ -1,1527 -1,0 +1,1527 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +/** 
 + * Test various distributed aspects of transactions,
 + * e.g. locking/reservation symantics that do not need multiple Region
 + * configurations. For those tests see
 + * <code>MultiVMRegionTestCase</code>.
 + * 
 + *
 + * @author Mitch Thomas
 + * @since 4.0
 + * @see MultiVMRegionTestCase
 + *
 + */
 +
 +package com.gemstone.gemfire.cache30;
 +
 +
 +//import com.gemstone.gemfire.*;
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Properties;
 +import java.util.concurrent.CountDownLatch;
 +
 +import junit.framework.AssertionFailedError;
 +
 +import org.junit.Ignore;
 +
 +import com.gemstone.gemfire.SystemFailure;
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheLoader;
 +import com.gemstone.gemfire.cache.CacheTransactionManager;
 +import com.gemstone.gemfire.cache.CommitConflictException;
 +import com.gemstone.gemfire.cache.CommitIncompleteException;
 +import com.gemstone.gemfire.cache.DataPolicy;
 +import com.gemstone.gemfire.cache.DiskAccessException;
 +import com.gemstone.gemfire.cache.LoaderHelper;
 +import com.gemstone.gemfire.cache.MirrorType;
 +import com.gemstone.gemfire.cache.Operation;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionAttributes;
 +import com.gemstone.gemfire.cache.Scope;
 +import com.gemstone.gemfire.cache.TimeoutException;
 +import com.gemstone.gemfire.distributed.internal.ResourceEvent;
 +import com.gemstone.gemfire.distributed.internal.ResourceEventsListener;
 +import com.gemstone.gemfire.distributed.internal.locks.DLockBatch;
 +import com.gemstone.gemfire.distributed.internal.locks.DLockService;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper;
 +import com.gemstone.gemfire.internal.cache.CommitReplyException;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
 +import com.gemstone.gemfire.internal.cache.LocalRegion;
 +import com.gemstone.gemfire.internal.cache.RegionEntry;
 +import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 +import com.gemstone.gemfire.internal.cache.TXState;
 +import com.gemstone.gemfire.internal.cache.TXStateInterface;
 +import com.gemstone.gemfire.internal.cache.TXStateProxyImpl;
 +//import com.gemstone.gemfire.internal.cache.locks.TXLockId;
 +import com.gemstone.gemfire.internal.cache.locks.TXLockBatch;
 +import com.gemstone.gemfire.internal.cache.locks.TXLockService;
 +import com.gemstone.gemfire.internal.cache.locks.TXLockServiceImpl;
 +import com.gemstone.gemfire.test.dunit.Assert;
 +import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 +import com.gemstone.gemfire.test.dunit.IgnoredException;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.SerializableCallable;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +import com.gemstone.gemfire.test.dunit.WaitCriterion;
 +
 +public class TXDistributedDUnitTest extends CacheTestCase {
 +  public TXDistributedDUnitTest(String name) {
 +    super(name);
 +  }
 +
 +  protected RegionAttributes getRegionAttributes() {
 +    return this.getRegionAttributes(Scope.DISTRIBUTED_ACK);
 +  }
 +
 +  protected RegionAttributes getRegionAttributes(Scope scope) {
 +    AttributesFactory factory = new AttributesFactory();
 +    factory.setScope(scope);
 +    if (scope.isDistributedAck()) {
 +      factory.setEarlyAck(false);
 +    }
 +    return factory.create();
 +  }
 +
 +  /**
 +   * Test a remote grantor
 +   */
 +  public void testRemoteGrantor() throws Exception {
 +    IgnoredException.addIgnoredException("killing members ds");
 +    final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
 +    final String rgnName = getUniqueName();
 +    Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
 +    rgn.create("key", null);
 +
 +    Invoke.invokeInEveryVM(new SerializableRunnable("testRemoteGrantor: initial configuration") {
 +        public void run() {
 +          try {
 +            Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
 +            rgn1.put("key", "val0");
 +          } catch (CacheException e) {
 +            Assert.fail("While creating region", e);
 +          }
 +        }
 +      });
 +    
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +//    VM vm1 = host.getVM(1);
 +//    VM vm2 = host.getVM(2);
 +
 +    vm0.invoke(new SerializableRunnable("testRemoteGrantor: remote grantor init") {
 +        public void run() {
 +          try {
 +            Region rgn1 = getCache().getRegion(rgnName);
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            txMgr2.begin();
 +            rgn1.put("key", "val1");
 +            txMgr2.commit();
 +            assertNotNull(TXLockService.getDTLS());
 +            assertTrue(TXLockService.getDTLS().isLockGrantor());
 +          } catch (CacheException e) {
 +            fail("While performing first transaction");
 +          }
 +        }
 +      });
 +
 +    // fix for bug 38843 causes the DTLS to be created in every TX participant
 +    assertNotNull(TXLockService.getDTLS());
 +    assertFalse(TXLockService.getDTLS().isLockGrantor());
 +    assertEquals("val1", rgn.getEntry("key").getValue());
 +
 +    vm0.invoke(new SerializableRunnable("Disconnect from DS, remote grantor death") {
 +        public void run() {
 +            try {
 +              MembershipManagerHelper.crashDistributedSystem(getSystem());
 +            } finally {
 +                // Allow getCache() to re-establish a ds connection
 +                closeCache();
 +            }
 +        }
 +      });
 +
 +    // Make this VM the remote Grantor
 +    txMgr.begin();
 +    rgn.put("key", "val2");
 +    txMgr.commit();
 +    assertNotNull(TXLockService.getDTLS());
 +    assertTrue(TXLockService.getDTLS().isLockGrantor());
 +
 +    SerializableRunnable remoteComm = 
 +      new SerializableRunnable("testRemoteGrantor: remote grantor commit") {
 +        public void run() {
 +          try {
 +            Cache c = getCache();
 +            CacheTransactionManager txMgr2 = c.getCacheTransactionManager();
 +            Region rgn1 = c.getRegion(rgnName);
 +            if (rgn1 == null) {
 +                // This block should only execute on VM0
 +                rgn1 = c.createRegion(rgnName, getRegionAttributes());
 +            }
 +
 +            txMgr2.begin();
 +            rgn1.put("key", "val3");
 +            txMgr2.commit();
 +
 +            if (TXLockService.getDTLS() != null) {
 +              assertTrue(!TXLockService.getDTLS().isLockGrantor());
 +            }
 +          } catch (CacheException e) {
 +            Assert.fail("While creating region", e);
 +          }
 +        }
 +      };
 +    Invoke.invokeInEveryVM(remoteComm);
 +    // vm1.invoke(remoteComm);
 +    // vm2.invoke(remoteComm);
 +
 +    assertNotNull(TXLockService.getDTLS());
 +    assertTrue(TXLockService.getDTLS().isLockGrantor());
 +    assertEquals("val3", rgn.getEntry("key").getValue());
 +    rgn.destroyRegion();
 +  }
 +
 +  /**
 +   * Test the internal callbacks used for what else... testing
 +   */
 +  public void testInternalCallbacks() throws Exception {
 +    final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
 +    final String rgnName1 = getUniqueName() + "_1";
 +    final String rgnName2 = getUniqueName() + "_2";
 +    final String rgnName3 = getUniqueName() + "_3";
 +    Region rgn1 = getCache().createRegion(rgnName1, getRegionAttributes());
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    SerializableRunnable createRgn = 
 +      new SerializableRunnable("testInternalCallbacks: initial configuration") {
 +        public void run() {
 +          try {
 +            Region rgn1a = getCache().createRegion(rgnName1, getRegionAttributes());
 +            Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
 +            Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
 +            rgn1a.create("key", null);
 +            rgn2.create("key", null);
 +            rgn3.create("key", null);
 +          } catch (CacheException e) {
 +            Assert.fail("While creating region", e);
 +          }
 +        }
 +      };
 +    vm0.invoke(createRgn);
 +    vm1.invoke(createRgn);
 +   
 +    // Standard commit check
 +    txMgr.begin();
 +    rgn1.put("key", "value0");
 +    txMgr.commit();
 +    SerializableRunnable checkRgn1 = 
 +      new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
 +        public void run() {
 +          Region rgn1a = getCache().getRegion(rgnName1);
 +          assertNotNull(rgn1a);
 +          assertEquals("value0", rgn1a.getEntry("key").getValue());
 +        }
 +      };
 +    vm0.invoke(checkRgn1);
 +    vm1.invoke(checkRgn1);
 +
 +    {
 +      final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
 +      txMgr.begin();
 +      ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
 +      setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
 +      rgn1.put("key", "value1");
 +      txMgr.commit();
 +      for(int i=cbSensors.length-3; i>=0; --i) {
 +        assertEquals("Internal callback " + i + " was not called the expected number of times!", 
 +                     (byte) 1, cbSensors[i]);
 +      }
 +      for(int i=cbSensors.length-1; i>cbSensors.length-3; --i) {
 +        assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!", 
 +                     (byte) 2, cbSensors[i]);
 +      }
 +    }
 +    SerializableRunnable checkRgn1Again = 
 +      new SerializableRunnable("testInternalCallbacks: validate remote values") {
 +        public void run() {
 +          Region rgn1a = getCache().getRegion(rgnName1);
 +          assertNotNull(rgn1a);
 +          assertEquals("value1", rgn1a.getEntry("key").getValue());
 +        }
 +      };
 +    vm0.invoke(checkRgn1Again);
 +    vm1.invoke(checkRgn1Again);
 +    
 +    // Try 2 regions
 +    Region rgn2 = getCache().createRegion(rgnName2, getRegionAttributes());
 +    txMgr.begin();
 +    rgn1.put("key", "value2");
 +    rgn2.put("key", "value2");
 +    txMgr.commit();
 +    SerializableRunnable checkRgn12 = 
 +      new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
 +        public void run() {
 +          Region rgn1a = getCache().getRegion(rgnName1);
 +          assertNotNull(rgn1a);
 +          assertEquals("value2", rgn1a.getEntry("key").getValue());
 +          Region rgn2a = getCache().getRegion(rgnName2);
 +          assertNotNull(rgn2a);
 +          assertEquals("value2", rgn2a.getEntry("key").getValue());
 +        }
 +      };
 +    vm0.invoke(checkRgn12);
 +    vm1.invoke(checkRgn12);
 +
 +    {
 +      final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
 +      txMgr.begin();
 +      ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
 +      setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
 +      rgn1.put("key", "value3");
 +      rgn2.put("key", "value3");
 +      txMgr.commit();
 +      
 +      for(int i=cbSensors.length-3; i>=0; i--) {
 +        assertEquals("Internal callback " + i + " was not called the expected number of times!", 
 +                     (byte) 1, cbSensors[i]);
 +      }
 +      for(int i=cbSensors.length-1; i> cbSensors.length-3; --i) {
 +        assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!", 
 +                     (byte) 2, cbSensors[i]);
 +      }
 +    }
 +    SerializableRunnable checkRgn12Again = 
 +      new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
 +        public void run() {
 +          Region rgn1a = getCache().getRegion(rgnName1);
 +          assertNotNull(rgn1a);
 +          assertEquals("value3", rgn1a.getEntry("key").getValue());
 +          Region rgn2a = getCache().getRegion(rgnName2);
 +          assertNotNull(rgn2a);
 +          assertEquals("value3", rgn2a.getEntry("key").getValue());
 +        }
 +      };
 +    vm0.invoke(checkRgn12Again);
 +    vm1.invoke(checkRgn12Again);
 +
 +    // Try a third region (D_NO_ACK)
 +    Region rgn3 = getCache().createRegion(rgnName3, getRegionAttributes(Scope.DISTRIBUTED_NO_ACK));
 +    txMgr.begin();
 +    rgn1.put("key", "value4");
 +    rgn2.put("key", "value4");
 +    rgn3.put("key", "value4");
 +    txMgr.commit();
 +    SerializableRunnable checkRgn123 = 
 +      new SerializableRunnable("testInternalCallbacks: check rgn1 valus") {
 +        public void run() {
 +          Region rgn1a = getCache().getRegion(rgnName1);
 +          assertNotNull(rgn1a);
 +          assertEquals("value4", rgn1a.getEntry("key").getValue());
 +          Region rgn2a = getCache().getRegion(rgnName2);
 +          assertNotNull(rgn2a);
 +          assertEquals("value4", rgn2a.getEntry("key").getValue());
 +          Region rgn3a = getCache().getRegion(rgnName3);
 +          assertNotNull(rgn3a);
 +          assertEquals("value4", rgn3a.getEntry("key").getValue());
 +        }
 +      };
 +    vm0.invoke(checkRgn123);
 +    vm1.invoke(checkRgn123);
 +
 +    {
 +      final byte cbSensors[] = {0,0,0,0,0,0,0,0,0};
 +      txMgr.begin();
 +      ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
 +      setInternalCallbacks(((TXManagerImpl)txMgr).getTXState(), cbSensors);
 +
 +      rgn1.put("key", "value5");
 +      rgn2.put("key", "value5");
 +      rgn3.put("key", "value5");
 +      txMgr.commit();
 +      
 +      for(int i=cbSensors.length-3; i>=0; i--) {
 +        assertEquals("Internal callback " + i + " was not called the expected number of times!", 
 +                     (byte) 1, cbSensors[i]);
 +      }
 +      for(int i=cbSensors.length-1; i> cbSensors.length-3; --i) {
 +        assertEquals("Internal \"during\" callback " + i + " invoked an unexpected number of times!", 
 +                     (byte) 2, cbSensors[i]);
 +      }
 +    }
 +    SerializableRunnable checkRgn123Again = 
 +      new SerializableRunnable("testInternalCallbacks: validate both regions remote values") {
 +        public void run() {
 +          Region rgn1a = getCache().getRegion(rgnName1);
 +          assertNotNull(rgn1a);
 +          assertEquals("value5", rgn1a.getEntry("key").getValue());
 +          Region rgn2a = getCache().getRegion(rgnName2);
 +          assertNotNull(rgn2a);
 +          assertEquals("value5", rgn2a.getEntry("key").getValue());
 +          Region rgn3a = getCache().getRegion(rgnName3);
 +          assertNotNull(rgn3a);
 +          assertEquals("value5", rgn3a.getEntry("key").getValue());
 +        }
 +      };
 +    vm0.invoke(checkRgn123Again);
 +    vm1.invoke(checkRgn123Again);
 +    
 +    rgn1.destroyRegion();
 +    rgn2.destroyRegion();
 +  }
 +  static final void setInternalCallbacks(TXStateInterface txp, final byte[] cbSensors) {
 +    ((TXStateProxyImpl)txp).forceLocalBootstrap();
 +    TXState tx = (TXState)((TXStateProxyImpl)txp).getRealDeal(null,null);
 +    assertEquals(9, cbSensors.length);
 +    tx.setAfterReservation(new Runnable() {
 +        public void run() {
 +          cbSensors[0]++;
 +        }
 +      });
 +    tx.setAfterConflictCheck(new Runnable() {
 +        public void run() {
 +          cbSensors[1]++;
 +        }
 +      });
 +    tx.setAfterApplyChanges(new Runnable() {
 +        public void run() {
 +          cbSensors[2]++;            
 +        }
 +      });
 +    tx.setAfterReleaseLocalLocks(new Runnable() {
 +        public void run() {
 +          cbSensors[3]++;            
 +        }
 +      });
 +    tx.setAfterIndividualSend(new Runnable() {
 +        public void run() {
 +          cbSensors[4]++;
 +        }
 +      });
 +    tx.setAfterIndividualCommitProcess(new Runnable() {
 +        public void run() {
 +          cbSensors[5]++;
 +        }
 +      });
 +    tx.setAfterSend(new Runnable() {
 +        public void run() {
 +          cbSensors[6]++;
 +        }
 +      });
 +    tx.setDuringIndividualSend(new Runnable() {
 +        public void run() {
 +          cbSensors[7]++;            
 +        }
 +      });
 +    tx.setDuringIndividualCommitProcess(new Runnable() {
 +        public void run() {
 +          cbSensors[8]++;
 +        }
 +      });
 +  }
 +
 +  /** 
 +   * Test distributed ack transactions that consist only of 
 +   * data from loaded values
 +   */
 +  public void testDACKLoadedMessage() throws Exception {
 +    final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
 +    final String rgnName = getUniqueName();
 +    AttributesFactory factory = new AttributesFactory();
 +    factory.setScope(Scope.DISTRIBUTED_ACK);
 +    factory.setEarlyAck(false);
 +    factory.setCacheLoader(new CacheLoader() {
 +        public Object load(LoaderHelper helper) {
 +          return "val" + helper.getArgument();
 +        }
 +        public void close() {}
 +      });
 +    Region rgn = getCache().createRegion(rgnName, factory.create());
 +    
 +    Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: intial configuration") {
 +        public void run() {
 +          try {
 +            AttributesFactory factory2 = new AttributesFactory();
 +            factory2.setScope(Scope.DISTRIBUTED_ACK);
 +            factory2.setEarlyAck(false);
 +            // factory.setDataPolicy(DataPolicy.REPLICATE);
 +            factory2.setMirrorType(MirrorType.KEYS);
 +            getCache().createRegion(rgnName, factory2.create());
 +          } catch (CacheException e) {
 +            Assert.fail("While creating region", e);
 +          }
 +        }
 +      });
 +
 +    // Confirm the standard case
 +    txMgr.begin();
 +    rgn.put("key1", "val1");
 +    txMgr.commit();
 +    assertEquals("val1", rgn.getEntry("key1").getValue());
 +
 +    Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
 +        public void run() {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          assertEquals("val1", rgn1.getEntry("key1").getValue());
 +        }
 +      });
 +
 +    // Confirm loaded value case
 +    txMgr.begin();
 +    rgn.get("key2", new Integer(2));
 +    txMgr.commit();
 +    assertEquals("val2", rgn.getEntry("key2").getValue());
 +    
 +    Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
 +        public void run() {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          assertEquals("val2", rgn1.getEntry("key2").getValue());
 +        }
 +      });
 +
 +    // This should use the ack w/ the lockid
 +    txMgr.begin();
 +    rgn.put("key3", "val3");
 +    rgn.get("key4", new Integer(4));
 +    txMgr.commit();
 +
 +    Invoke.invokeInEveryVM(new SerializableRunnable("testDACKLoadedMessage: confirm standard case") {
 +        public void run() {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          assertEquals("val3", rgn1.getEntry("key3").getValue());
 +          assertEquals("val4", rgn1.getEntry("key4").getValue());
 +        }
 +      });
 +
 +  }
 +  
 +  @Override
 +  public Properties getDistributedSystemProperties() {
 +    Properties p = super.getDistributedSystemProperties();
 +    p.put("log-level", LogWriterUtils.getDUnitLogLevel());
 +    return p;
 +  }
 +
 +  public void testHighAvailabilityFeatures() throws Exception {
 +    IgnoredException.addIgnoredException("DistributedSystemDisconnectedException");
 +//    final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
 +//    final TXManagerImpl txMgrImpl = (TXManagerImpl) txMgr;
 +    final String rgnName = getUniqueName();
 +    AttributesFactory factory = new AttributesFactory();
 +    factory.setScope(Scope.DISTRIBUTED_ACK);
 +    factory.setEarlyAck(false);
 +    Region rgn = getCache().createRegion(rgnName, factory.create());
 +    Invoke.invokeInEveryVM(new SerializableRunnable("testHighAvailabilityFeatures: intial region configuration") {
 +        public void run() {
 +          try {
 +            AttributesFactory factory2 = new AttributesFactory();
 +            factory2.setScope(Scope.DISTRIBUTED_ACK);
 +            factory2.setEarlyAck(false);
 +            factory2.setDataPolicy(DataPolicy.REPLICATE);
 +            getCache().createRegion(rgnName, factory2.create());
 +          } catch (CacheException e) {
 +            Assert.fail("While creating region", e);
 +          }
 +        }
 +      });
 +
 +    // create entries
 +    rgn.put("key0", "val0_0");
 +    rgn.put("key1", "val1_0");
 +
 +    Host host = Host.getHost(0);
 +    // This test assumes that there are at least three VMs; the origin and two recipients
 +    assertTrue(host.getVMCount() >= 3);
 +    final VM originVM = host.getVM(0);
 +
 +    // Test that there is no commit after a partial commit message
 +    // send (only sent to a minority of the recipients)
 +    originVM.invoke(new SerializableRunnable("Flakey DuringIndividualSend Transaction") {
 +        public void run() {
 +          final Region rgn1 = getCache().getRegion(rgnName);
 +          assertNotNull(rgn1);
 +          try {
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            final CacheTransactionManager txMgrImpl = txMgr2;
 +            
 +            txMgr2.begin();
 +            // 1. setup an internal callback on originVM that will call
 +            // disconnectFromDS() on the 2nd duringIndividualSend
 +            // call.
 +            ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
 +            TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
 +            txState.setDuringIndividualSend(new Runnable() {
 +                private int numCalled = 0;
 +                public synchronized void run() {
 +                  ++numCalled;
 +                  rgn1.getCache().getLogger().info("setDuringIndividualSend Runnable called " + numCalled + " times");
 +                  if (numCalled > 1) {
 +                    MembershipManagerHelper.crashDistributedSystem(getSystem());
 +                  }
 +                }
 +              });
 +            rgn1.put("key0", "val0_1");
 +            rgn1.put("key1", "val1_1");
 +            // 2. commit a transaction in originVM, it will disconnect from the DS
 +            txMgr2.commit();
 +          } 
 +          catch (VirtualMachineError e) {
 +            SystemFailure.initiateFailure(e);
 +            throw e;
 +          }
 +          catch (Throwable e) {
 +            rgn1.getCache().getLogger().warning("Ignoring Exception", e);
 +          } 
 +          finally {
 +            // Allow this VM to re-connect to the DS upon getCache() call
 +            closeCache();
 +          }
 +        }
 +      });
 +    // 3. verify on all VMs that the transaction was not committed
 +    final SerializableRunnable noChangeValidator = 
 +      new SerializableRunnable("testHighAvailabilityFeatures: validate no change in Region") {
 +        public void run() {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          if (rgn1 == null) {
 +            // Expect a null region from originVM
 +            try {
 +              AttributesFactory factory2 = new AttributesFactory();
 +              factory2.setScope(Scope.DISTRIBUTED_ACK);
 +              factory2.setEarlyAck(false);
 +              factory2.setDataPolicy(DataPolicy.REPLICATE);
 +              rgn1 = getCache().createRegion(rgnName, factory2.create());
 +            } catch (CacheException e) {
 +              Assert.fail("While creating region", e);
 +            }
 +          }
 +          Region.Entry re = rgn1.getEntry("key0");
 +          assertNotNull(re);
 +          assertEquals("val0_0", re.getValue());
 +          re = rgn1.getEntry("key1");
 +          assertNotNull(re);
 +          assertEquals("val1_0", re.getValue());
 +        }
 +      };
 +    Invoke.invokeInEveryVM(noChangeValidator);
 +
 +    // Test that there is no commit after sending to all recipients
 +    // but prior to sending the "commit process" message
 +    originVM.invoke(new SerializableRunnable("Flakey AfterIndividualSend Transaction") {
 +        public void run() {
 +          final Region rgn1 = getCache().getRegion(rgnName);
 +          assertNotNull(rgn1);
 +          try {
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            final CacheTransactionManager txMgrImpl = txMgr2;
 +            
 +            txMgr2.begin();
 +            // 1. setup an internal callback on originVM that will call
 +            // disconnectFromDS() on AfterIndividualSend
 +            ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
 +            TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
 +            txState.setAfterIndividualSend(new Runnable() {
 +                public synchronized void run() {
 +                  MembershipManagerHelper.crashDistributedSystem(getSystem());
 +                }
 +              });
 +            rgn1.put("key0", "val0_2");
 +            rgn1.put("key1", "val1_2");
 +            // 2. commit a transaction in originVM, it will disconnect from the DS
 +            txMgr2.commit();
 +          } 
 +          catch (VirtualMachineError e) {
 +            SystemFailure.initiateFailure(e);
 +            throw e;
 +          }
 +          catch (Throwable e) {
 +            rgn1.getCache().getLogger().warning("Ignoring Exception", e);
 +          } 
 +          finally {
 +            // Allow this VM to re-connect to the DS upon getCache() call
 +            closeCache();
 +          }
 +        }
 +      });
 +    // 3. verify on all VMs, including the origin, that the transaction was not committed
 +    Invoke.invokeInEveryVM(noChangeValidator);
 +
 +    // Test commit success upon a single commit process message received.
 +    originVM.invoke(new SerializableRunnable("Flakey DuringIndividualCommitProcess Transaction") {
 +        public void run() {
 +          final Region rgn1 = getCache().getRegion(rgnName);
 +          assertNotNull(rgn1);
 +          try {
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            final CacheTransactionManager txMgrImpl = txMgr2;
 +            
 +            txMgr2.begin();
 +
 +            ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
 +            TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
 +            // 1. setup an internal callback on originVM that will call
 +            // disconnectFromDS() on the 2nd internalDuringIndividualCommitProcess
 +            // call.
 +            txState.setDuringIndividualCommitProcess(new Runnable() {
 +                private int numCalled = 0;
 +                public synchronized void run() {
 +                  ++numCalled;
 +                  rgn1.getCache().getLogger().info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
 +                  if (numCalled > 1) {
 +                    MembershipManagerHelper.crashDistributedSystem(getSystem());
 +                  }
 +                }
 +              });
 +            rgn1.put("key0", "val0_3");
 +            rgn1.put("key1", "val1_3");
 +            // 2. commit a transaction in originVM, it will disconnect from the DS
 +            txMgr2.commit();
 +          } 
 +          catch (VirtualMachineError e) {
 +            SystemFailure.initiateFailure(e);
 +            throw e;
 +          }
 +          catch (Throwable e) {
 +            rgn1.getCache().getLogger().warning("Ignoring Exception", e);
 +          } 
 +          finally {
 +            // Allow this VM to re-connect to the DS upon getCache() call
 +            closeCache();
 +          }
 +        }
 +      });
 +    // 3. verify on all VMs that the transaction was committed (including the orgin, due to GII)
 +    SerializableRunnable nonSoloChangeValidator1 = 
 +      new SerializableRunnable("testHighAvailabilityFeatures: validate v1 non-solo Region changes") {
 +        public void run() {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          if (rgn1 == null) {
 +            // Expect a null region from originVM
 +            try {
 +              AttributesFactory factory2 = new AttributesFactory();
 +              factory2.setScope(Scope.DISTRIBUTED_ACK);
 +              factory2.setEarlyAck(false);
 +              factory2.setDataPolicy(DataPolicy.REPLICATE);
 +              rgn1 = getCache().createRegion(rgnName, factory2.create());
 +            } catch (CacheException e) {
 +              Assert.fail("While creating region", e);
 +            }
 +          }
 +          long giveUp = System.currentTimeMillis() + 10000;
 +          while (giveUp > System.currentTimeMillis()) {
 +            try {
 +              Region.Entry re = rgn1.getEntry("key0");
 +              assertNotNull(re);
 +              assertEquals("val0_3", re.getValue());
 +              re = rgn1.getEntry("key1");
 +              assertNotNull(re);
 +              assertEquals("val1_3", re.getValue());
 +              break;
 +            } catch (AssertionFailedError e) {
 +              if (giveUp > System.currentTimeMillis()) {
 +                throw e;
 +              }
 +            }
 +          }
 +        }
 +      };
 +    Invoke.invokeInEveryVM(nonSoloChangeValidator1);
 +
 +    // Verify successful solo region commit after duringIndividualSend
 +    // (same as afterIndividualSend).
 +    // Create a region that only exists on the origin and another VM
 +    final String soloRegionName = getUniqueName() + "_solo";
 +    SerializableRunnable createSoloRegion = 
 +      new SerializableRunnable("testHighAvailabilityFeatures: solo region configuration") {
 +        public void run() {
 +          try {
 +            AttributesFactory factory2 = new AttributesFactory();
 +            factory2.setScope(Scope.DISTRIBUTED_ACK);
 +            factory2.setEarlyAck(false);
 +            factory2.setDataPolicy(DataPolicy.REPLICATE);
 +            Region rgn1 = getCache().createRegion(soloRegionName, factory2.create());
 +            rgn1.put("soloKey0", "soloVal0_0");
 +            rgn1.put("soloKey1", "soloVal1_0");
 +          } catch (CacheException e) {
 +            Assert.fail("While creating region", e);
 +          }
 +        }
 +      };
 +    final VM soloRegionVM = host.getVM(1);
 +    originVM.invoke(createSoloRegion);
 +    soloRegionVM.invoke(createSoloRegion);
 +    originVM.invoke(new SerializableRunnable("Flakey solo region DuringIndividualSend Transaction") {
 +        public void run() {
 +          final Region soloRgn = getCache().getRegion(soloRegionName);
 +          assertNotNull(soloRgn);
 +          try {
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            final CacheTransactionManager txMgrImpl = txMgr2;
 +            
 +            txMgr2.begin();
 +            // 1. setup an internal callback on originVM that will call
 +            // disconnectFromDS() on the 2nd duringIndividualSend
 +            // call.
 +            ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
 +            TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
 +            txState.setDuringIndividualSend(new Runnable() {
 +                private int numCalled = 0;
 +                public synchronized void run() {
 +                  ++numCalled;
 +                  soloRgn.getCache().getLogger().info("setDuringIndividualSend Runnable called " + numCalled + " times");
 +                  if (numCalled > 1) {
 +                    MembershipManagerHelper.crashDistributedSystem(getSystem());
 +                  }
 +                }
 +              });
 +            soloRgn.put("soloKey0", "soloVal0_1");
 +            soloRgn.put("soloKey1", "soloVal1_1");
 +            // 2. commit a transaction in originVM, it will disconnect from the DS
 +            txMgr2.commit();
 +          } 
 +          catch (VirtualMachineError e) {
 +            SystemFailure.initiateFailure(e);
 +            throw e;
 +          }
 +          catch (Throwable e) {
 +            soloRgn.getCache().getLogger().warning("Ignoring Exception", e);
 +          } 
 +          finally {
 +            // Allow this VM to re-connect to the DS upon getCache() call
 +            closeCache();
 +          }
 +        }
 +      });
 +    // 3. verify on the soloRegionVM that the transaction was committed
 +    final SerializableRunnable soloRegionCommitValidator1 = 
 +      new SerializableRunnable("testHighAvailabilityFeatures: validate successful v1 commit in solo Region") {
 +        public void run() {
 +          Region soloRgn = getCache().getRegion(soloRegionName);
 +          if (soloRgn == null) {
 +            // Expect a null region from originVM
 +            try {
 +              AttributesFactory factory2 = new AttributesFactory();         
 +              factory2.setScope(Scope.DISTRIBUTED_ACK);
 +              factory2.setEarlyAck(false);
 +              factory2.setDataPolicy(DataPolicy.REPLICATE);
 +              soloRgn = getCache().createRegion(soloRegionName, factory2.create());
 +            } catch (CacheException e) {
 +              Assert.fail("While creating region ", e);
 +            }
 +          }
 +          Region.Entry re = soloRgn.getEntry("soloKey0");
 +          assertNotNull(re);
 +          assertEquals("soloVal0_1", re.getValue());
 +          re = soloRgn.getEntry("soloKey1");
 +          assertNotNull(re);
 +          assertEquals("soloVal1_1", re.getValue());
 +        }
 +      };
 +    originVM.invoke(soloRegionCommitValidator1);
 +    soloRegionVM.invoke(soloRegionCommitValidator1);
 +    // verify no change in nonSolo region, re-establish region in originVM
 +    Invoke.invokeInEveryVM(nonSoloChangeValidator1);
 +
 +    // Verify no commit for failed send (afterIndividualSend) for solo
 +    // Region combined with non-solo Region
 +    originVM.invoke(new SerializableRunnable("Flakey mixed (solo+non-solo) region DuringIndividualSend Transaction") {
 +        public void run() {
 +          final Region rgn1 = getCache().getRegion(rgnName);
 +          assertNotNull(rgn1);
 +          final Region soloRgn = getCache().getRegion(soloRegionName);
 +          assertNotNull(soloRgn);
 +          try {
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            final CacheTransactionManager txMgrImpl = txMgr2;
 +            
 +            txMgr2.begin();
 +            // 1. setup an internal callback on originVM that will call
 +            // disconnectFromDS() on the afterIndividualSend
 +            // call.
 +            ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
 +            TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
 +            txState.setAfterIndividualSend(new Runnable() {
 +                public synchronized void run() {
 +                  MembershipManagerHelper.crashDistributedSystem(getSystem());
 +                }
 +              });
 +
 +            rgn1.put("key0", "val0_4");
 +            rgn1.put("key1", "val1_4");
 +            soloRgn.put("soloKey0", "soloVal0_2");
 +            soloRgn.put("soloKey1", "soloVal1_2");
 +
 +            // 2. commit a transaction in originVM, it will disconnect from the DS
 +            txMgr2.commit();
 +          } 
 +          catch (VirtualMachineError e) {
 +            SystemFailure.initiateFailure(e);
 +            throw e;
 +          }
 +          catch (Throwable e) {
 +            rgn1.getCache().getLogger().warning("Ignoring Exception", e);
 +          } 
 +          finally {
 +            // Allow this VM to re-connect to the DS upon getCache() call
 +            closeCache();
 +          }
 +        }
 +      });
 +    // Origin and Solo Region VM should be the same as last validation
 +    originVM.invoke(soloRegionCommitValidator1);
 +    soloRegionVM.invoke(soloRegionCommitValidator1);
 +    Invoke.invokeInEveryVM(nonSoloChangeValidator1);
 +
 +    // Verify commit after sending a single
 +    // (duringIndividualCommitProcess) commit process for solo Region
 +    // combined with non-solo Region
 +    originVM.invoke(new SerializableRunnable("Flakey mixed (solo+non-solo) region DuringIndividualCommitProcess Transaction") {
 +        public void run() {
 +          final Region rgn1 = getCache().getRegion(rgnName);
 +          assertNotNull(rgn1);
 +          final Region soloRgn = getCache().getRegion(soloRegionName);
 +          assertNotNull(soloRgn);
 +          try {
 +            final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +            final CacheTransactionManager txMgrImpl = txMgr2;
 +            
 +            txMgr2.begin();
 +            // 1. setup an internal callback on originVM that will call
 +            // disconnectFromDS() on the afterIndividualSend
 +            // call.
 +            ((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).forceLocalBootstrap();
 +            TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgrImpl).getTXState()).getRealDeal(null,null);
 +            txState.setAfterIndividualSend(new Runnable() {
 +                private int numCalled = 0;
 +                public synchronized void run() {
 +                  ++numCalled;
 +                  rgn1.getCache().getLogger().info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
 +                  if (numCalled > 1) {
 +                    MembershipManagerHelper.crashDistributedSystem(getSystem());
 +                  }
 +                }
 +              });
 +
 +            rgn1.put("key0", "val0_5");
 +            rgn1.put("key1", "val1_5");
 +            soloRgn.put("soloKey0", "soloVal0_3");
 +            soloRgn.put("soloKey1", "soloVal1_3");
 +
 +            // 2. commit a transaction in originVM, it will disconnect from the DS
 +            txMgr2.commit();
 +          } 
 +          catch (VirtualMachineError e) {
 +            SystemFailure.initiateFailure(e);
 +            throw e;
 +          }
 +          catch (Throwable e) {
 +            rgn1.getCache().getLogger().warning("Ignoring Exception", e);
 +          } 
 +          finally {
 +            // Allow this VM to re-connect to the DS upon getCache() call
 +            closeCache();
 +          }
 +        }
 +      });
 +    final SerializableRunnable soloRegionCommitValidator2 = 
 +      new SerializableRunnable("testHighAvailabilityFeatures: validate successful v2 commit in solo Region") {
 +        public void run() {
 +          Region soloRgn = getCache().getRegion(soloRegionName);
 +          if (soloRgn == null) {
 +            // Expect a null region from originVM
 +            try {
 +              AttributesFactory factory2 = new AttributesFactory();         
 +              factory2.setScope(Scope.DISTRIBUTED_ACK);
 +              factory2.setEarlyAck(false);
 +              factory2.setDataPolicy(DataPolicy.REPLICATE);
 +              soloRgn = getCache().createRegion(soloRegionName, factory2.create());
 +            } catch (CacheException e) {
 +              Assert.fail("While creating region ", e);
 +            }
 +          }
 +          Region.Entry re = soloRgn.getEntry("soloKey0");
 +          assertNotNull(re);
 +          assertEquals("soloVal0_3", re.getValue());
 +          re = soloRgn.getEntry("soloKey1");
 +          assertNotNull(re);
 +          assertEquals("soloVal1_3", re.getValue());
 +        }
 +      };
 +    originVM.invoke(soloRegionCommitValidator2);
 +    soloRegionVM.invoke(soloRegionCommitValidator2);
 +    SerializableRunnable nonSoloChangeValidator2 = 
 +      new SerializableRunnable("testHighAvailabilityFeatures: validate v2 non-solo Region changes") {
 +        public void run() {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          if (rgn1 == null) {
 +            // Expect a null region from originVM
 +            try {
 +              AttributesFactory factory2 = new AttributesFactory();
 +              factory2.setScope(Scope.DISTRIBUTED_ACK);
 +              factory2.setEarlyAck(false);
 +              factory2.setDataPolicy(DataPolicy.REPLICATE);
 +              rgn1 = getCache().createRegion(rgnName, factory2.create());
 +            } catch (CacheException e) {
 +              Assert.fail("While creating region", e);
 +            }
 +          }
 +          Region.Entry re = rgn1.getEntry("key0");
 +          assertNotNull(re);
 +          assertEquals("val0_5", re.getValue());
 +          re = rgn1.getEntry("key1");
 +          assertNotNull(re);
 +          assertEquals("val1_5", re.getValue());
 +        }
 +      };
 +    Invoke.invokeInEveryVM(nonSoloChangeValidator2);
 +  }
 +  
 +  /** 
 +   * A class used in testLockBatchParticipantsUpdate to pause a transaction in the
 +   * afterResrvation and afterSend states.
 +   */
 +  static public class PausibleTX implements Runnable {
 +    public boolean isRunning = false;
 +    public String rgnName = null;
 +    public Cache myCache = null;
 +    public Object key = null;
 +    public Object value = null;
 +
 +    public boolean getIsRunning() {
 +      return this.isRunning;
 +    }
 +
 +    public void run() {
 +      Region rgn = this.myCache.getRegion(this.rgnName);
 +      final CacheTransactionManager txMgr = this.myCache.getCacheTransactionManager();
 +      txMgr.begin();
 +      ((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).forceLocalBootstrap();
 +      TXState txState = (TXState)((TXStateProxyImpl)((TXManagerImpl)txMgr).getTXState()).getRealDeal(null,null);
 +      txState.setAfterReservation(new Runnable() {
 +        public void run() {
 +          try {
 +            synchronized (PausibleTX.class) {
 +              PausibleTX.this.isRunning = true;
 +              // Notify the thread that created this, that we are ready
 +              PausibleTX.class.notifyAll();
 +              // Wait for the controller to start a GII and let us proceed
 +              PausibleTX.class.wait();
 +            }
 +          }
 +          catch (InterruptedException ie) {
 +//            PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
 +            fail("interrupted");
 +          }
 +        }
 +      });
 +      txState.setAfterSend(new Runnable() {
 +        public void run() {
 +          try {
 +            synchronized (PausibleTX.class) {
 +              // Notify the controller that we have sent the TX data (and the
 +              // update)
 +              PausibleTX.class.notifyAll();
 +              // Wait until the controller has determined in fact the update
 +              // took place
 +              PausibleTX.class.wait();
 +            }
 +          }
 +          catch (InterruptedException ie) {
 +//            PausibleTX.this.myCache.getLogger().info("Why was I interrupted? " + ie);
 +            fail("interrupted");
 +          }
 +        }
 +      });
 +      try { 
 +        rgn.put(key, value);
 +        txMgr.commit();
 +      } catch (CommitConflictException cce) {
 +        fail("Did not expect commit conflict exception when sending updates to new members in PausibleTX" + cce);
 +//      } catch (CacheException ce) {
 +//        fail("Did not expect cache exception when sending updates to new members in PausibleTX" + ce);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Returns the GemFire system ID of the VM on which this method is run
 +   */
 +  public static Serializable getSystemId() {
 +    Serializable ret = null;
 +    if (DistributedTestCase.system != null) {
 +      ret = DistributedTestCase.system.getDistributionManager().getId();
 +    }
 +    return ret;
 +  }
 +  static HashSet preTXSystemIds;
 +  public static void setPreTXSystemIds(HashSet ids) {
 +    TXDistributedDUnitTest.preTXSystemIds = ids;
 +  }
 +  static HashSet postTXSystemIds;
 +  public static void setPostTXSystemIds(HashSet ids) {
 +    TXDistributedDUnitTest.postTXSystemIds = ids;
 +  }
 +  static Serializable txHostId;
 +  public static void setTXHostSystemId(Serializable id) {
 +    TXDistributedDUnitTest.txHostId = id;
 +  }
 +  /**
 +   * Test update of lock batch participants (needed when new members are
 +   * discovered between a commit's locking phase and the applicatoin of the
 +   * Region's data. See bug 32999
 +   */
 +  public void testLockBatchParticipantsUpdate() throws Exception {
 +//    final CacheTransactionManager txMgr = this.getCache().getCacheTransactionManager();
 +    final String rgnName = getUniqueName();
 +    Region rgn = getCache().createRegion(rgnName, getRegionAttributes());
 +    rgn.create("key", null);
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    SerializableRunnable initRegions = 
 +      new SerializableRunnable("testLockBatchParticipantsUpdate: initial configuration") {
 +      public void run() {
 +        try {
 +          Region rgn1 = getCache().createRegion(rgnName, getRegionAttributes());
 +          rgn1.create("key", null);
 +        } catch (CacheException e) {
 +          Assert.fail("While creating region", e);
 +        }
 +      }
 +    };
 +    vm0.invoke(initRegions);
 +    vm1.invoke(initRegions);
 +    rgn.put("key", "val1");
 +
 +    // Connect vm2 also since it may have been shutdown when logPerTest
 +    // is turned on
 +    vm2.invoke(new SerializableRunnable("connect vm2 if not connected") {
 +      public void run() {
 +        getCache();
 +      }
 +    });
 +
 +    // Make VM0 the Grantor 
 +    vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: remote grantor init") {
 +      public void run() {
 +        try {
 +          Region rgn1 = getCache().getRegion(rgnName);
 +          final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
 +          assertEquals("val1", rgn1.getEntry("key").getValue());
 +          txMgr2.begin();
 +          rgn1.put("key", "val2");
 +          txMgr2.commit();
 +          assertNotNull(TXLockService.getDTLS());
 +          assertTrue(TXLockService.getDTLS().isLockGrantor());
 +        } catch (CacheException e) {
 +          fail("While performing first transaction");
 +        }
 +      }
 +    });
 +    
 +    // fix for bug 38843 causes the DTLS to be created in every TX participant
 +    assertNotNull(TXLockService.getDTLS());
 +    assertFalse(TXLockService.getDTLS().isLockGrantor());
 +    assertEquals("val2", rgn.getEntry("key").getValue());
 +
 +    // Build sets of System Ids and set them up on VM0 for future batch member checks
 +    HashSet txMembers = new HashSet(4);
 +    txMembers.add(getSystemId());
-     txMembers.add(vm0.invoke(TXDistributedDUnitTest.class, "getSystemId"));
-     vm0.invoke(TXDistributedDUnitTest.class, "setPreTXSystemIds", new Object[] {txMembers});
-     txMembers.add(vm2.invoke(TXDistributedDUnitTest.class, "getSystemId"));
-     vm0.invoke(TXDistributedDUnitTest.class, "setPostTXSystemIds", new Object[] {txMembers});
++    txMembers.add(vm0.invoke(() -> TXDistributedDUnitTest.getSystemId()));
++    vm0.invoke(() -> TXDistributedDUnitTest.setPreTXSystemIds(txMembers));
++    txMembers.add(vm2.invoke(() -> TXDistributedDUnitTest.getSystemId()));
++    vm0.invoke(() -> TXDistributedDUnitTest.setPostTXSystemIds(txMembers));
 +
 +    // Don't include the tx host in the batch member set(s)
-     Serializable vm1HostId = (Serializable) vm1.invoke(TXDistributedDUnitTest.class, "getSystemId");
-     vm0.invoke(TXDistributedDUnitTest.class, "setTXHostSystemId", new Object[] {vm1HostId});
++    Serializable vm1HostId = (Serializable) vm1.invoke(() -> TXDistributedDUnitTest.getSystemId());
++    vm0.invoke(() -> TXDistributedDUnitTest.setTXHostSystemId(vm1HostId));
 +
 +    // Create a TX on VM1 (such that it will ask for locks on VM0) that uses the callbacks
 +    // to pause and give us time to start a GII process on another VM
 +    vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: slow tx (one that detects new member)") {
 +      public void run() {
 +        // fix for bug 38843 causes the DTLS to be created in every TX participant
 +        assertNotNull(TXLockService.getDTLS());
 +        assertFalse(TXLockService.getDTLS().isLockGrantor());
 +        
 +        PausibleTX pauseTXRunnable = new PausibleTX();
 +        pauseTXRunnable.rgnName = rgnName;
 +        pauseTXRunnable.myCache = getCache();
 +        pauseTXRunnable.key = "key";
 +        pauseTXRunnable.value = "val3";
 +        new Thread(pauseTXRunnable, "PausibleTX Thread").start();
 +        synchronized(PausibleTX.class) {
 +          while(!pauseTXRunnable.getIsRunning()) {
 +            try {
 +              PausibleTX.class.wait();
 +            }
 +            catch (InterruptedException ie) {
 +              fail("Did not expect " + ie);
 +            }
 +          }
 +        }
 +      }
 +    });
 +
 +    // Verify that the lock batch exists VM0 and has the size we expect
 +    vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Verify lock batch exists on VM0 with expected size") {
 +      public void run() {
 +        getCache().getRegion(rgnName);
 +        TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
 +        assertNotNull(dtls);
 +        assertTrue(dtls.isLockGrantor());
 +        DLockService dLockSvc = dtls.getInternalDistributedLockService();
 +        assertNotNull(TXDistributedDUnitTest.txHostId);
 +        DLockBatch[] batches = dLockSvc.getGrantor().getLockBatches((InternalDistributedMember)TXDistributedDUnitTest.txHostId);
 +        assertEquals(batches.length, 1);
 +        TXLockBatch txLockBatch = (TXLockBatch) batches[0];
 +        assertNotNull(txLockBatch);
 +        assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
 +        assertTrue("Members in lock batch " + txLockBatch.getParticipants() + " not the same as " + TXDistributedDUnitTest.preTXSystemIds, 
 +                   txLockBatch.getParticipants().equals(TXDistributedDUnitTest.preTXSystemIds));
 +      }
 +    });
 +    
 +    // Start a GII process on VM2
 +    vm2.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: start GII") {
 +      public void run() {
 +        try {
 +          AttributesFactory factory = new AttributesFactory();
 +          factory.setScope(Scope.DISTRIBUTED_ACK);
 +          factory.setEarlyAck(false);
 +          factory.setDataPolicy(DataPolicy.REPLICATE);
 +          getCache().createRegion(rgnName, factory.create());
 +        } catch (CacheException e) {
 +          Assert.fail("While creating region", e);
 +        }
 +      }
 +    });
 +
 +    // Notify TX on VM1 so that it can continue
 +    vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to continue") {
 +      public void run() {
 +        synchronized(PausibleTX.class) {
 +          // Notify VM1 that it should proceed to the TX send
 +          PausibleTX.class.notifyAll();
 +          // Wait until VM1 has sent the TX
 +          try {
 +            PausibleTX.class.wait();
 +          }
 +          catch (InterruptedException ie) {
 +            fail("Did not expect " + ie);
 +          }
 +        }
 +      }
 +    });
 +    
 +    // Verify that the batch on VM0 has added VM2 into the set
 +    vm0.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Verify lock batch contains VM2") {
 +      public void run() {
 +        getCache().getRegion(rgnName);
 +        TXLockServiceImpl dtls = (TXLockServiceImpl) TXLockService.getDTLS();
 +        assertNotNull(dtls);
 +        assertTrue(dtls.isLockGrantor());
 +        DLockService dLockSvc = dtls.getInternalDistributedLockService();
 +        assertNotNull(TXDistributedDUnitTest.txHostId);
 +        DLockBatch[] batches = dLockSvc.getGrantor().getLockBatches((InternalDistributedMember)TXDistributedDUnitTest.txHostId);
 +        assertEquals(batches.length, 1);
 +        TXLockBatch txLockBatch = (TXLockBatch) batches[0];
 +        assertNotNull(txLockBatch);
 +        assertNotNull(TXDistributedDUnitTest.preTXSystemIds);
 +        assertTrue("Members in lock batch " + txLockBatch.getParticipants() + " not the same as " + TXDistributedDUnitTest.postTXSystemIds, 
 +                   txLockBatch.getParticipants().equals(TXDistributedDUnitTest.postTXSystemIds));
 +      }
 +    });
 +    // fix for bug 38843 causes the DTLS to be created in every TX participant
 +    assertNotNull(TXLockService.getDTLS());
 +    assertFalse(TXLockService.getDTLS().isLockGrantor());
 +    assertEquals("val3", rgn.getEntry("key").getValue());
 +    
 +    
 +    // Notify TX on VM1 that it can go ahead and complete the TX
 +    vm1.invoke(new SerializableRunnable("testLockBatchParticipantsUpdate: Notfiy VM1 TX to finish") {
 +      public void run() {
 +        synchronized(PausibleTX.class) {
 +          // Notify VM1 that it should finish the TX
 +          PausibleTX.class.notifyAll();
 +        }
 +      }
 +    });
 +    
 +    
 +    rgn.destroyRegion();
 +  }
 +
 +  /**
 +   * Hitachi bug 38809:  Applying an exception to a remote VM fails due to
 +   * an IOException on a Region configured for LRU Overflow
 +   */
 +  public static final String TROUBLE_KEY = "GoBoom";
 +  public static class TXTroubleMaker implements LocalRegion.TestCallable {
 +    // private final Region r;
 +    public void call(LocalRegion r, Operation op, RegionEntry re) {
 +      if (TROUBLE_KEY.equals(re.getKey())) {
 +        throw new DiskAccessException(TROUBLE_KEY, r);
 +      }
 +    }
 +  }
 +
 +  public static class ShutdownListener implements ResourceEventsListener {
 +    CountDownLatch latch = new CountDownLatch(1);
 +    @Override
 +    public void handleEvent(ResourceEvent event, Object resource) {
 +      if (event.equals(ResourceEvent.CACHE_REMOVE)) {
 +        try {
 +          latch.await();
 +        } catch (InterruptedException e) {
 +          e.printStackTrace();
 +        }
 +      }
 +    }
 +    public void unblockShutdown() {
 +      this.latch.countDown();
 +    }
 +  }
 +  
 +  @Ignore("Disabled for 51260")
 +  public void DISABLED_testRemoteCommitFailure() {
 +    try {
 +    disconnectAllFromDS();
 +    final String rgnName1= getUniqueName()  + "_1";
 +    final String rgnName2 = getUniqueName() + "_2";
 +    final String diskStoreName = getUniqueName() + "_store";
 +    Host host = Host.getHost(0);
 +    VM origin = host.getVM(0);
 +    VM trouble1 = host.getVM(1);
 +    VM trouble2 = host.getVM(2);
 +    VM noTrouble = host.getVM(3);
 +    CacheSerializableRunnable initRegions =
 +      new CacheSerializableRunnable("Initialize no trouble regions") {
 +      @Override
 +      public void run2() {
 +        getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
 +        TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
 +        AttributesFactory af = new AttributesFactory();
 +        af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
 +        af.setScope(Scope.DISTRIBUTED_ACK);
 +        af.setDiskStoreName(diskStoreName);
 +        getCache().createRegion(rgnName1, af.create());
 +        getCache().createRegion(rgnName2, af.create());
 +      }
 +    };
 +    origin.invoke(initRegions);
 +    noTrouble.invoke(initRegions);
 +    SerializableRunnable initTroulbeRegions =
 +      new CacheSerializableRunnable("Initialize regions that cause trouble") {
 +      @Override
 +      public void run2() {
 +        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +        InternalRegionArguments ira = new InternalRegionArguments().setTestCallable(new TXTroubleMaker());
 +        try {
 +          getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
 +          TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
 +          AttributesFactory af = new AttributesFactory();
 +          af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
 +          af.setScope(Scope.DISTRIBUTED_ACK);
 +          af.setDiskStoreName(diskStoreName);
 +          gfc.createVMRegion(rgnName1, af.create(), ira);
 +          gfc.createVMRegion(rgnName2, af.create(), ira);
 +          gfc.getDistributedSystem().addResourceListener(new ShutdownListener());
 +        } catch (IOException ioe) {
 +          fail(ioe.toString());
 +        }
 +        catch (TimeoutException e) {
 +          fail(e.toString());
 +        }
 +        catch (ClassNotFoundException e) {
 +          fail(e.toString());
 +        }
 +      }
 +    };
 +    trouble1.invoke(initTroulbeRegions);
 +    trouble2.invoke(initTroulbeRegions);
 +
 +    SerializableRunnable doTransaction =
 +      new CacheSerializableRunnable("Run failing transaction") {
 +      @Override
 +      public void run2() {
 +        Cache c = getCache();
 +        Region r1 = c.getRegion(rgnName1);
 +        assertNotNull(r1);
 +        Region r2 = c.getRegion(rgnName2);
 +        assertNotNull(r2);
 +        CacheTransactionManager txmgr = c.getCacheTransactionManager();
 +        txmgr.begin();
 +        r1.put("k1", "k1");
 +        r1.put("k2", "k2");
 +        r1.put(TROUBLE_KEY, TROUBLE_KEY);
 +        r2.put("k1", "k1");
 +        r2.put("k2", "k2");
 +        r2.put(TROUBLE_KEY, TROUBLE_KEY);
 +        try {
 +          txmgr.commit();
 +          fail("Expected an tx incomplete exception");
 +        } catch (CommitIncompleteException yay) {
 +          String msg = yay.getMessage();
 +//          getLogWriter().info("failing exception", yay);
 +          // Each region on a trouble VM should be mentioned (two regions per trouble VM)
 +          int ind=0, match=0;
 +          while((ind = msg.indexOf(rgnName1, ind)) >= 0) {
 +            ind++; match++;
 +          }
 +          assertEquals(2, match);
 +          ind=match=0;
 +          while((ind = msg.indexOf(rgnName2, ind)) >= 0) {
 +            ind++; match++;
 +          }
 +          assertEquals(2, match);
 +          // DiskAccessExcpetions should be mentioned four times
 +          ind=match=0;
 +          while((ind = msg.indexOf(DiskAccessException.class.getName(), ind)) >= 0) {
 +            ind++; match++;
 +          }
 +          assertEquals(4, match);
 +        }
 +      }
 +    };
 +    IgnoredException ee = null;
 +    try {
 +      ee = IgnoredException.addIgnoredException(DiskAccessException.class.getName() + "|" +
 +          CommitIncompleteException.class.getName() + "|" +
 +          CommitReplyException.class.getName());
 +      origin.invoke(doTransaction);
 +    } finally {
 +      if (ee!=null) ee.remove();
 +    }
 +
 +    SerializableCallable allowCacheToShutdown = new SerializableCallable() {
 +      @Override
 +      public Object call() throws Exception {
 +        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
 +        List<ResourceEventsListener> listeners = cache.getDistributedSystem().getResourceListeners();
 +        for (ResourceEventsListener l : listeners) {
 +          if (l instanceof ShutdownListener) {
 +            ShutdownListener shutListener = (ShutdownListener) l;
 +            shutListener.unblockShutdown();
 +          }
 +        }
 +        return null;
 +      }
 +    };
 +    trouble1.invoke(allowCacheToShutdown);
 +    trouble2.invoke(allowCacheToShutdown);
 +
 +    // Assert proper content on failing VMs
 +    SerializableRunnable assertTroubledContent =
 +      new CacheSerializableRunnable("Assert partail commit data") {
 +      @Override
 +      public void run2() {
 +        final Cache c = getCache();
 +        Wait.waitForCriterion(new WaitCriterion() {
 +          @Override
 +          public boolean done() {
 +            return c.getRegion(rgnName1) == null;
 +          }
 +          
 +          @Override
 +          public String description() {
 +            return null;
 +          }
 +        }, 30000, 1000, true);
 +        Region r2 = c.getRegion(rgnName2);
 +        assertNull(r2);
 +      }
 +    };
 +    trouble1.invoke(assertTroubledContent);
 +    trouble2.invoke(assertTroubledContent);
 +
 +    // Assert proper content on successful VMs
 +    SerializableRunnable assertSuccessfulContent =
 +      new CacheSerializableRunnable("Assert complete commit of data on successful VMs") {
 +      @Override
 +      public void run2() {
 +        Cache c = getCache();
 +        {
 +          Region r1 = c.getRegion(rgnName1);
 +          assertNotNull(r1);
 +          assertEquals("k1", r1.getEntry("k1").getValue());
 +          assertEquals("k2", r1.getEntry("k2").getValue());
 +          assertEquals(TROUBLE_KEY, r1.getEntry(TROUBLE_KEY).getValue());
 +        }
 +        {
 +          Region r2 = c.getRegion(rgnName2);
 +          assertNotNull(r2);
 +          assertEquals("k1", r2.getEntry("k1").getValue());
 +          assertEquals("k2", r2.getEntry("k2").getValue());
 +          assertEquals(TROUBLE_KEY, r2.getEntry(TROUBLE_KEY).getValue());
 +        }
 +      }
 +    };
 +    noTrouble.invoke(assertSuccessfulContent);
 +    
 +    // Assert no content on originating VM
 +    SerializableRunnable assertNoContent =
 +      new CacheSerializableRunnable("Assert data survives on origin VM") {
 +      @Override
 +      public void run2() {
 +        Cache c = getCache();
 +        {
 +          Region r1 = c.getRegion(rgnName1);
 +          assertNotNull(r1);
 +          assertNotNull(r1.getEntry("k1"));
 +          assertNotNull(r1.getEntry("k2"));
 +          assertNotNull(r1.getEntry(TROUBLE_KEY));
 +        }
 +        {
 +          Region r2 = c.getRegion(rgnName2);
 +          assertNotNull(r2);
 +          assertNotNull(r2.getEntry("k1"));
 +          assertNotNull(r2.getEntry("k2"));
 +          assertNotNull(r2.getEntry(TROUBLE_KEY));
 +        }
 +      }
 +    };
 +    origin.invoke(assertNoContent);
 +    } finally {
 +      Invoke.invokeInEveryVM(new SerializableCallable() {
 +        @Override
 +        public Object call() throws Exception {
 +          TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
 +          return null;
 +        }
 +      });
 +    }
 +  }
 +}
 +

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
index a253f09,0000000..b1d7bab
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/TXOrderDUnitTest.java
@@@ -1,434 -1,0 +1,434 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache30;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Iterator;
 +import java.util.List;
 +
 +import javax.naming.Context;
 +import javax.transaction.UserTransaction;
 +
 +import com.gemstone.gemfire.CopyHelper;
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.CacheEvent;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheListener;
 +import com.gemstone.gemfire.cache.CacheLoader;
 +import com.gemstone.gemfire.cache.CacheLoaderException;
 +import com.gemstone.gemfire.cache.CacheTransactionManager;
 +import com.gemstone.gemfire.cache.DataPolicy;
 +import com.gemstone.gemfire.cache.EntryEvent;
 +import com.gemstone.gemfire.cache.LoaderHelper;
 +import com.gemstone.gemfire.cache.PartitionAttributes;
 +import com.gemstone.gemfire.cache.PartitionAttributesFactory;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.Scope;
 +import com.gemstone.gemfire.cache.TransactionEvent;
 +import com.gemstone.gemfire.cache.TransactionListener;
 +import com.gemstone.gemfire.cache.query.IndexType;
 +import com.gemstone.gemfire.cache.query.Query;
 +import com.gemstone.gemfire.cache.query.QueryService;
 +import com.gemstone.gemfire.cache.query.SelectResults;
 +import com.gemstone.gemfire.cache.query.transaction.Person;
 +import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 +import com.gemstone.gemfire.cache.util.TransactionListenerAdapter;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 +import com.gemstone.gemfire.test.dunit.SerializableCallable;
 +import com.gemstone.gemfire.test.dunit.VM;
 +
 +/**
 + * Test the order of operations done on the farside of a tx.
 + *
 + * @author darrel
 + * @since 5.0
 + */
 +public class TXOrderDUnitTest extends CacheTestCase {
 +
 +  private transient Region r;
 +  private transient DistributedMember otherId;
 +  protected transient int invokeCount;
 +  
 +  public TXOrderDUnitTest(String name) {
 +    super(name);
 +  }
 +
 +  private VM getOtherVm() {
 +    Host host = Host.getHost(0);
 +    return host.getVM(0);
 +  }
 +    
 +  private void initOtherId() {
 +    VM vm = getOtherVm();
 +    vm.invoke(new CacheSerializableRunnable("Connect") {
 +        public void run2() throws CacheException {
 +          getCache();
 +        }
 +      });
-     this.otherId = (DistributedMember)vm.invoke(TXOrderDUnitTest.class, "getVMDistributedMember");
++    this.otherId = (DistributedMember)vm.invoke(() -> TXOrderDUnitTest.getVMDistributedMember());
 +  }
 +  private void doCommitOtherVm() {
 +    VM vm = getOtherVm();
 +    vm.invoke(new CacheSerializableRunnable("create root") {
 +        public void run2() throws CacheException {
 +          AttributesFactory af = new AttributesFactory();
 +          af.setScope(Scope.DISTRIBUTED_ACK);
 +          Region r1 = createRootRegion("r1", af.create());
 +          Region r2 = r1.createSubregion("r2", af.create());
 +          Region r3 = r2.createSubregion("r3", af.create());
 +          CacheTransactionManager ctm =  getCache().getCacheTransactionManager();
 +          ctm.begin();
 +          r2.put("b", "value1");
 +          r3.put("c", "value2");
 +          r1.put("a", "value3");
 +          r1.put("a2", "value4");
 +          r3.put("c2", "value5");
 +          r2.put("b2", "value6");
 +          ctm.commit();
 +        }
 +      });
 +  }
 +
 +  public static DistributedMember getVMDistributedMember() {
 +    return InternalDistributedSystem.getAnyInstance().getDistributedMember();
 +  }
 +  
 +  //////////////////////  Test Methods  //////////////////////
 +
 +  List expectedKeys;
 +  int clCount = 0;
 +
 +  Object getCurrentExpectedKey() {
 +    Object result = this.expectedKeys.get(this.clCount);
 +    this.clCount += 1;
 +    return result;
 +  }
 +  /**
 +   * make sure listeners get invoked in correct order on far side of tx
 +   */
 +  public void testFarSideOrder() throws CacheException {
 +    initOtherId();
 +    AttributesFactory af = new AttributesFactory();
 +    af.setDataPolicy(DataPolicy.REPLICATE);
 +    af.setScope(Scope.DISTRIBUTED_ACK);
 +    CacheListener cl1 = new CacheListenerAdapter() {
 +        public void afterCreate(EntryEvent e) {
 +          assertEquals(getCurrentExpectedKey(), e.getKey());
 +        }
 +      };
 +    af.addCacheListener(cl1);
 +    Region r1 = createRootRegion("r1", af.create());
 +    Region r2 = r1.createSubregion("r2", af.create());
 +    r2.createSubregion("r3", af.create());
 +
 +    TransactionListener tl1 = new TransactionListenerAdapter() {
 +        public void afterCommit(TransactionEvent e) {
 +          assertEquals(6, e.getEvents().size());
 +          ArrayList keys = new ArrayList();
 +          Iterator it = e.getEvents().iterator();
 +          while (it.hasNext()) {
 +            EntryEvent ee = (EntryEvent)it.next();
 +            keys.add(ee.getKey());
 +            assertEquals(null, ee.getCallbackArgument());
 +            assertEquals(true, ee.isCallbackArgumentAvailable());
 +          }
 +          assertEquals(TXOrderDUnitTest.this.expectedKeys, keys);
 +          TXOrderDUnitTest.this.invokeCount = 1;
 +        }
 +      };
 +    CacheTransactionManager ctm =  getCache().getCacheTransactionManager();
 +    ctm.addListener(tl1);
 +
 +    this.invokeCount = 0;
 +    this.clCount = 0;
 +    this.expectedKeys = Arrays.asList(new String[]{"b", "c", "a", "a2", "c2", "b2"});
 +    doCommitOtherVm();
 +    assertEquals(1, this.invokeCount);
 +    assertEquals(6, this.clCount);
 +  }
 +
 +  /**
 +   * test bug#40870
 +   * @throws Exception
 +   */
 +  public void _testFarSideOpForLoad() throws Exception {
 +    Host host = Host.getHost(0);
 +    VM vm1 = host.getVM(0);
 +    VM vm2 = host.getVM(1);
 +    vm1.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        AttributesFactory af = new AttributesFactory();
 +        af.setDataPolicy(DataPolicy.REPLICATE);
 +        af.setScope(Scope.DISTRIBUTED_ACK);
 +        CacheListener cl1 = new CacheListenerAdapter() {
 +          public void afterCreate(EntryEvent e) {
 +            assertTrue(e.getOperation().isLocalLoad());
 +          }
 +        };
 +        af.addCacheListener(cl1);
 +        CacheLoader cl = new CacheLoader() {
 +          public Object load(LoaderHelper helper) throws CacheLoaderException {
 +            LogWriterUtils.getLogWriter().info("Loading value:"+helper.getKey()+"_value");
 +            return helper.getKey()+"_value";
 +          }
 +          public void close() {
 +          }
 +        };
 +        af.setCacheLoader(cl);
 +        createRootRegion("r1", af.create());
 +        return null;
 +      }
 +    });
 +
 +    vm2.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        AttributesFactory af = new AttributesFactory();
 +        af.setDataPolicy(DataPolicy.REPLICATE);
 +        af.setScope(Scope.DISTRIBUTED_ACK);
 +        CacheListener cl1 = new CacheListenerAdapter() {
 +          public void afterCreate(EntryEvent e) {
 +            LogWriterUtils.getLogWriter().info("op:"+e.getOperation().toString());
 +            assertTrue(!e.getOperation().isLocalLoad());
 +          }
 +        };
 +        af.addCacheListener(cl1);
 +        createRootRegion("r1", af.create());
 +        return null;
 +      }
 +    });
 +
 +    vm1.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        Region r = getRootRegion("r1");
 +        getCache().getCacheTransactionManager().begin();
 +        r.get("obj_2");
 +        getCache().getCacheTransactionManager().commit();
 +        return null;
 +      }
 +    });
 +  }
 +
 +  public void testInternalRegionNotExposed() {
 +    Host host = Host.getHost(0);
 +    VM vm1 = host.getVM(0);
 +    VM vm2 = host.getVM(1);
 +    SerializableCallable createRegion = new SerializableCallable() {
 +      public Object call() throws Exception {
 +        ExposedRegionTransactionListener tl = new ExposedRegionTransactionListener();
 +        CacheTransactionManager ctm = getCache().getCacheTransactionManager();
 +        ctm.addListener(tl);
 +        ExposedRegionCacheListener cl = new ExposedRegionCacheListener();
 +        AttributesFactory af = new AttributesFactory();
 +        PartitionAttributes pa = new PartitionAttributesFactory()
 +          .setRedundantCopies(1)
 +          .setTotalNumBuckets(1)
 +          .create();
 +        af.setPartitionAttributes(pa);
 +        af.addCacheListener(cl);
 +        Region pr = createRootRegion("testTxEventForRegion", af.create());
 +        return null;
 +      }
 +    };
 +    vm1.invoke(createRegion);
 +    vm2.invoke(createRegion);
 +    vm1.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        Region pr = getRootRegion("testTxEventForRegion");
 +        CacheTransactionManager ctm = getCache().getCacheTransactionManager();
 +        pr.put(2, "tw");
 +        pr.put(3, "three");
 +        pr.put(4, "four");
 +        ctm.begin();
 +        pr.put(1, "one");
 +        pr.put(2, "two");
 +        pr.invalidate(3);
 +        pr.destroy(4);
 +        ctm.commit();
 +        return null;
 +      }
 +    });
 +    SerializableCallable verifyListener = new SerializableCallable() {
 +      public Object call() throws Exception {
 +        Region pr = getRootRegion("testTxEventForRegion");
 +        CacheTransactionManager ctm = getCache().getCacheTransactionManager();
 +        ExposedRegionTransactionListener tl = (ExposedRegionTransactionListener)ctm.getListeners()[0];
 +        ExposedRegionCacheListener cl = (ExposedRegionCacheListener)pr.getAttributes().getCacheListeners()[0];
 +        assertFalse(tl.exceptionOccurred);
 +        assertFalse(cl.exceptionOccurred);
 +        return null;
 +      }
 +    };
 +    vm1.invoke(verifyListener);
 +    vm2.invoke(verifyListener);
 +  }
 +  
 +  class ExposedRegionTransactionListener extends TransactionListenerAdapter {
 +    private boolean exceptionOccurred = false;
 +    @Override
 +    public void afterCommit(TransactionEvent event) {
 +      List<CacheEvent<?, ?>> events = event.getEvents();
 +      for (CacheEvent<?, ?>e : events) {
 +        if (!"/testTxEventForRegion".equals(e.getRegion().getFullPath())) {
 +          exceptionOccurred = true;
 +        }
 +      }
 +    }
 +  }
 +  class ExposedRegionCacheListener extends CacheListenerAdapter {
 +    private boolean exceptionOccurred = false;
 +    @Override
 +    public void afterCreate(EntryEvent event) {
 +      verifyRegion(event);
 +    }
 +    @Override
 +    public void afterUpdate(EntryEvent event) {
 +      verifyRegion(event);
 +    }
 +    private void verifyRegion(EntryEvent event) {
 +      if (!"/testTxEventForRegion".equals(event.getRegion().getFullPath())) {
 +        exceptionOccurred = true;
 +      }
 +    }
 +  }
 +  
 +  private final int TEST_PUT = 0;
 +  private final int TEST_INVALIDATE = 1;
 +  private final int TEST_DESTROY = 2;
 +  /**
 +   * verify that queries on indexes work with transaction
 +   * @throws Exception
 +   */
 +  public void testFarSideIndexOnPut() throws Exception {
 +    doTest(TEST_PUT);
 +  }
 +
 +  public void testFarSideIndexOnInvalidate() throws Exception {
 +    doTest(TEST_INVALIDATE);
 +  }
 +
 +  public void testFarSideIndexOnDestroy() throws Exception {
 +    doTest(TEST_DESTROY);
 +  }
 +
 +  private void doTest(final int op) throws Exception {
 +    Host host = Host.getHost(0);
 +    VM vm1 = host.getVM(0);
 +    VM vm2 = host.getVM(1);
 +    SerializableCallable createRegionAndIndex = new SerializableCallable() {
 +      public Object call() throws Exception {
 +        AttributesFactory af = new AttributesFactory();
 +        af.setDataPolicy(DataPolicy.REPLICATE);
 +        af.setScope(Scope.DISTRIBUTED_ACK);
 +        Region region = createRootRegion("sample", af.create());
 +        QueryService qs = getCache().getQueryService();
 +        qs.createIndex("foo", IndexType.FUNCTIONAL, "age", "/sample");
 +        return null;
 +      }
 +    };
 +    vm1.invoke(createRegionAndIndex);
 +    vm2.invoke(createRegionAndIndex);
 +    
 +    //do transactional puts in vm1
 +    vm1.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        Context ctx = getCache().getJNDIContext();
 +        UserTransaction utx = (UserTransaction)ctx.lookup("java:/UserTransaction");
 +        Region region = getRootRegion("sample");
 +        Integer x = new Integer(0);
 +        utx.begin();
 +        region.create(x, new Person("xyz", 45));
 +        utx.commit();
 +        QueryService qs = getCache().getQueryService();
 +        Query q = qs.newQuery("select * from /sample where age < 50");
 +        assertEquals(1, ((SelectResults)q.execute()).size());
 +        Person dsample = (Person)CopyHelper.copy(region.get(x));
 +        dsample.setAge(55);
 +        utx.begin();
 +        switch (op) {
 +        case TEST_PUT:
 +          region.put(x, dsample);
 +          break;
 +        case TEST_INVALIDATE:
 +          region.invalidate(x);
 +          break;
 +        case TEST_DESTROY:
 +          region.destroy(x);
 +          break;
 +        default:
 +          fail("unknown op");
 +        }
 +        utx.commit();
 +        assertEquals(0, ((SelectResults)q.execute()).size());
 +        return null;
 +      }
 +    });
 +    
 +    //run query and verify results in other vm
 +    vm2.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        QueryService qs = getCache().getQueryService();
 +        Query q = qs.newQuery("select * from /sample where age < 50");
 +        assertEquals(0, ((SelectResults)q.execute()).size());
 +        return null;
 +      }
 +    });
 +  }
 +  
 +  public void testBug43353() {
 +    Host host = Host.getHost(0);
 +    VM vm1 = host.getVM(0);
 +    VM vm2 = host.getVM(1);
 +    
 +    SerializableCallable createRegion = new SerializableCallable() {
 +      public Object call() throws Exception {
 +        getCache().createRegionFactory(RegionShortcut.REPLICATE).create(getTestMethodName());
 +        return null;
 +      }
 +    };
 +    
 +    vm1.invoke(createRegion);
 +    vm2.invoke(createRegion);
 +    
 +    vm1.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        Region r = getCache().getRegion(getTestMethodName());
 +        r.put("ikey", "value");
 +        getCache().getCacheTransactionManager().begin();
 +        r.put("key1", new byte[20]);
 +        r.invalidate("ikey");
 +        getCache().getCacheTransactionManager().commit();
 +        return null;
 +      }
 +    });
 +    
 +    vm2.invoke(new SerializableCallable() {
 +      public Object call() throws Exception {
 +        Region r = getCache().getRegion(getTestMethodName());
 +        Object v = r.get("key1");
 +        assertNotNull(v);
 +        assertTrue(v instanceof byte[]);
 +        assertNull(r.get("ikey"));
 +        return null;
 +      }
 +    });
 +  }
 +}