You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:25 UTC

[07/51] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index d3b065c,0000000..7a4d09e
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@@ -1,9177 -1,0 +1,9177 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.cache30;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInput;
 +import java.io.DataInputStream;
 +import java.io.DataOutput;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Properties;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +import junit.framework.Assert;
 +import junit.framework.AssertionFailedError;
 +
 +import org.apache.logging.log4j.Logger;
 +import org.junit.Ignore;
 +
 +import com.gemstone.gemfire.DataSerializable;
 +import com.gemstone.gemfire.DataSerializer;
 +import com.gemstone.gemfire.Instantiator;
 +import com.gemstone.gemfire.InvalidDeltaException;
 +import com.gemstone.gemfire.LogWriter;
 +import com.gemstone.gemfire.cache.AttributesFactory;
 +import com.gemstone.gemfire.cache.AttributesMutator;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheEvent;
 +import com.gemstone.gemfire.cache.CacheException;
 +import com.gemstone.gemfire.cache.CacheFactory;
 +import com.gemstone.gemfire.cache.CacheListener;
 +import com.gemstone.gemfire.cache.CacheLoader;
 +import com.gemstone.gemfire.cache.CacheLoaderException;
 +import com.gemstone.gemfire.cache.CacheTransactionManager;
 +import com.gemstone.gemfire.cache.CacheWriter;
 +import com.gemstone.gemfire.cache.CacheWriterException;
 +import com.gemstone.gemfire.cache.DataPolicy;
 +import com.gemstone.gemfire.cache.EntryEvent;
 +import com.gemstone.gemfire.cache.EntryExistsException;
 +import com.gemstone.gemfire.cache.EntryNotFoundException;
 +import com.gemstone.gemfire.cache.ExpirationAction;
 +import com.gemstone.gemfire.cache.ExpirationAttributes;
 +import com.gemstone.gemfire.cache.InterestPolicy;
 +import com.gemstone.gemfire.cache.LoaderHelper;
 +import com.gemstone.gemfire.cache.Operation;
 +import com.gemstone.gemfire.cache.Region;
 +import com.gemstone.gemfire.cache.RegionAttributes;
 +import com.gemstone.gemfire.cache.RegionEvent;
 +import com.gemstone.gemfire.cache.RegionFactory;
 +import com.gemstone.gemfire.cache.RegionShortcut;
 +import com.gemstone.gemfire.cache.Scope;
 +import com.gemstone.gemfire.cache.SubscriptionAttributes;
 +import com.gemstone.gemfire.cache.TimeoutException;
 +import com.gemstone.gemfire.cache.TransactionEvent;
 +import com.gemstone.gemfire.cache.TransactionId;
 +import com.gemstone.gemfire.cache.TransactionListener;
 +import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 +import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 +import com.gemstone.gemfire.distributed.internal.DMStats;
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.internal.AvailablePortHelper;
 +import com.gemstone.gemfire.internal.HeapDataOutputStream;
 +import com.gemstone.gemfire.internal.InternalDataSerializer;
 +import com.gemstone.gemfire.internal.InternalInstantiator;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.cache.EntryExpiryTask;
 +import com.gemstone.gemfire.internal.cache.ExpiryTask;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.internal.cache.LocalRegion;
 +import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 +import com.gemstone.gemfire.internal.cache.RegionEntry;
 +import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 +import com.gemstone.gemfire.internal.cache.TXStateProxy;
 +import com.gemstone.gemfire.internal.cache.Token;
 +import com.gemstone.gemfire.internal.cache.TombstoneService;
 +import com.gemstone.gemfire.internal.cache.delta.Delta;
 +import com.gemstone.gemfire.internal.cache.versions.RegionVersionHolder;
 +import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 +import com.gemstone.gemfire.internal.cache.versions.VMRegionVersionVector;
 +import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
 +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
 +import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 +import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 +import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
 +import com.gemstone.gemfire.test.dunit.Host;
 +import com.gemstone.gemfire.test.dunit.RMIException;
 +import com.gemstone.gemfire.test.dunit.SerializableCallable;
 +import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 +import com.gemstone.gemfire.test.dunit.ThreadUtils;
 +import com.gemstone.gemfire.test.dunit.VM;
 +import com.gemstone.gemfire.test.dunit.Wait;
 +import com.gemstone.gemfire.test.dunit.WaitCriterion;
 +import com.gemstone.gemfire.test.dunit.IgnoredException;
 +import com.gemstone.gemfire.test.dunit.Invoke;
 +
 +
 +/**
 + * Abstract superclass of {@link Region} tests that involve more than
 + * one VM.
 + */
 +public abstract class MultiVMRegionTestCase extends RegionTestCase {
 +
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  Properties props = new Properties();
 +
 +  final int putRange_1Start = 1;
 +
 +  final int putRange_1End = 5;
 +
 +  final int putRange_2Start = 6;
 +
 +  final int putRange_2End = 10;
 +
 +  final int putRange_3Start = 11;
 +
 +  final int putRange_3End = 15;
 +
 +  final int putRange_4Start = 16;
 +
 +  final int putRange_4End = 20;
 +
 +  final int removeRange_1Start = 2;
 +
 +  final int removeRange_1End = 4;
 +
 +  final int removeRange_2Start = 7;
 +
 +  final int removeRange_2End = 9;
 +
 +  public MultiVMRegionTestCase(String name) {
 +    super(name);
 +  }
 +  
 +  public static void caseTearDown() throws Exception {
 +    disconnectAllFromDS();
 +  }
 +  
 +  @Override
 +  protected final void postTearDownRegionTestCase() throws Exception {
 +    DistributedTestCase.cleanupAllVms();
 +    CCRegion = null;
 +  }
 +
 +  // @todo can be used in tests
 +//  protected CacheSerializableRunnable createRegionTask(final String name) {
 +//    return new CacheSerializableRunnable("Create Region") {
 +//      public void run2() throws CacheException {
 +//        assertNotNull(createRegion(name));
 +//      }
 +//    };
 +//  }
 +
 +
 +  ////////  Test Methods
 +
 +  /**
 +   * This is a for the ConcurrentMap operations.
 +   * 4 VMs are used to
 +   * create the region and operations are performed on one of the nodes
 +   */
 +  public void testConcurrentOperations() throws Exception {
 +    SerializableRunnable createRegion = new CacheSerializableRunnable(
 +    "createRegion") {
 +
 +      public void run2() throws CacheException {
 +        Cache cache = getCache();
 +        RegionAttributes regionAttribs = getRegionAttributes();
 +        cache.createRegion("R1",
 +            regionAttribs);
 +      }
 +    };
 +
 +    Host host = Host.getHost(0);
 +    // create the VM(0 - 4)
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +    VM vm3 = host.getVM(3);
 +    vm0.invoke(createRegion);
 +    vm1.invoke(createRegion);
 +    vm2.invoke(createRegion);
 +    vm3.invoke(createRegion);
 +    concurrentMapTest("/R1");
 +  }
 +
 +  /**
 +   * Do putIfAbsent(), replace(Object, Object),
 +   * replace(Object, Object, Object), remove(Object, Object) operations
 +   */
 +  public void concurrentMapTest(final String rName) {
 +    
 +    //String exceptionStr = "";
 +    VM vm0 = Host.getHost(0).getVM(0);
 +    vm0.invoke(new CacheSerializableRunnable("doConcurrentMapOperations") {
 +      public void run2() throws CacheException {
 +        Cache cache = getCache();
 +        final Region pr = cache.getRegion(rName);
 +        assertNotNull(rName + " not created", pr);
 +               
 +        // test successful putIfAbsent
 +        for (int i = putRange_1Start; i <= putRange_1End; i++) {
 +          Object putResult = pr.putIfAbsent(Integer.toString(i),
 +                                            Integer.toString(i));
 +          assertNull("Expected null, but got " + putResult + " for key " + i,
 +                     putResult);
 +        }
 +        int size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                    pr.isEmpty());
 +        
 +        // test unsuccessful putIfAbsent
 +        for (int i = putRange_1Start; i <= putRange_1End; i++) {
 +          Object putResult = pr.putIfAbsent(Integer.toString(i),
 +                                            Integer.toString(i + 1));
 +          assertEquals("for i=" + i, Integer.toString(i), putResult);
 +          assertEquals("for i=" + i, Integer.toString(i), pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                    pr.isEmpty());
 +               
 +        // test successful replace(key, oldValue, newValue)
 +        for (int i = putRange_1Start; i <= putRange_1End; i++) {
 +         boolean replaceSucceeded = pr.replace(Integer.toString(i),
 +                                               Integer.toString(i),
 +                                               "replaced" + i);
 +          assertTrue("for i=" + i, replaceSucceeded);
 +          assertEquals("for i=" + i, "replaced" + i, pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                   pr.isEmpty());
 +               
 +        // test unsuccessful replace(key, oldValue, newValue)
 +        for (int i = putRange_1Start; i <= putRange_2End; i++) {
 +         boolean replaceSucceeded = pr.replace(Integer.toString(i),
 +                                               Integer.toString(i), // wrong expected old value
 +                                               "not" + i);
 +         assertFalse("for i=" + i, replaceSucceeded);
 +         assertEquals("for i=" + i,
 +                      i <= putRange_1End ? "replaced" + i : null,
 +                      pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                   pr.isEmpty());
 +                                    
 +        // test successful replace(key, value)
 +        for (int i = putRange_1Start; i <= putRange_1End; i++) {
 +          Object replaceResult = pr.replace(Integer.toString(i),
 +                                            "twice replaced" + i);
 +          assertEquals("for i=" + i, "replaced" + i, replaceResult);
 +          assertEquals("for i=" + i,
 +                       "twice replaced" + i,
 +                       pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                    pr.isEmpty());
 +                                    
 +        // test unsuccessful replace(key, value)
 +        for (int i = putRange_2Start; i <= putRange_2End; i++) {
 +          Object replaceResult = pr.replace(Integer.toString(i),
 +                                           "thrice replaced" + i);
 +          assertNull("for i=" + i, replaceResult);
 +          assertNull("for i=" + i, pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                    pr.isEmpty());
 +                                    
 +        // test unsuccessful remove(key, value)
 +        for (int i = putRange_1Start; i <= putRange_2End; i++) {
 +          boolean removeResult = pr.remove(Integer.toString(i),
 +                                           Integer.toString(-i));
 +          assertFalse("for i=" + i, removeResult);
 +          assertEquals("for i=" + i,
 +                       i <= putRange_1End ? "twice replaced" + i : null,
 +                       pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", putRange_1End, size);
 +        assertFalse("isEmpty doesnt return proper state of the region", 
 +                    pr.isEmpty());
 +
 +        // test successful remove(key, value)
 +        for (int i = putRange_1Start; i <= putRange_1End; i++) {
 +          boolean removeResult = pr.remove(Integer.toString(i),
 +                                           "twice replaced" + i);
 +          assertTrue("for i=" + i, removeResult);
 +          assertEquals("for i=" + i, null, pr.get(Integer.toString(i)));
 +        }
 +        size = pr.size();
 +        assertEquals("Size doesn't return expected value", 0, size);
 +        assertTrue("isEmpty doesnt return proper state of the region", 
 +                 pr.isEmpty());
 +        }
 +    });
 +    
 +    
 +    /*
 +     * destroy the Region.
 +     */
 +    vm0.invoke(new CacheSerializableRunnable("destroyRegionOp") {
 +               
 +       public void run2() {
 +         Cache cache = getCache();
 +         Region pr = cache.getRegion(rName);
 +         assertNotNull("Region already destroyed.", pr);
 +         pr.destroyRegion();
 +         assertTrue("Region isDestroyed false", pr.isDestroyed());
 +         assertNull("Region not destroyed.", cache.getRegion(rName));
 +       }
 +     });
 +  }
 +
 +  /**
 +   * Tests that doing a {@link Region#put put} in a distributed region
 +   * one VM updates the value in another VM.
 +   */
 +  public void testDistributedUpdate() {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    final Object key = "KEY";
 +    final Object oldValue = "OLD_VALUE";
 +    final Object newValue = "NEW_VALUE";
 +
 +    SerializableRunnable put =
 +      new CacheSerializableRunnable("Put key/value") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            region.put(key, oldValue);
 +            flushIfNecessary(region);
 +          }
 +      };
 +
 +    vm0.invoke(put);
 +    vm1.invoke(put);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Update") {
 +        public void run2() throws CacheException {
 +          Region region = getRootRegion().getSubregion(name);
 +          region.put(key, newValue);
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Validate update") {
 +        public void run2() throws CacheException {
 +          Region region = getRootRegion().getSubregion(name);
 +          Region.Entry entry = region.getEntry(key);
 +          assertNotNull(entry);
 +          assertEquals(newValue, entry.getValue());
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Tests that distributed updates are delivered in order
 +   *
 +   * <P>
 +   *
 +   * Note that this test does not make sense for regions that are
 +   * {@link Scope#DISTRIBUTED_NO_ACK} for which we do not guarantee
 +   * the ordering of updates for a single producer/single consumer.
 +   *
 +   * DISABLED 4-16-04 - the current implementation assumes events
 +   * are processed synchronously, which is no longer true.
 +   */
 +  public void _ttestOrderedUpdates() throws Throwable {
 +    if (getRegionAttributes().getScope() ==
 +        Scope.DISTRIBUTED_NO_ACK) {
 +      return;
 +    }
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final int lastNumber = 10;
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create region entry") {
 +          public void run2() throws CacheException {
 +            Region region = createRegion(name);
 +            region.create(key, null);
 +          }
 +        };
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set listener") {
 +        public void run2() throws CacheException {
 +          Region region = getRootRegion().getSubregion(name);
 +          region.setUserAttribute(new LinkedBlockingQueue());
 +          region.getAttributesMutator().addCacheListener(new
 +            CacheListenerAdapter() {
 +              public void afterUpdate(EntryEvent e) {
 +                Region region2 = e.getRegion();
 +                LinkedBlockingQueue queue =
 +                  (LinkedBlockingQueue) region2.getUserAttribute();
 +                Object value = e.getNewValue();
 +                assertNotNull(value);
 +                try {
 +                  com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("++ Adding " + value);
 +                  queue.put(value);
 +
 +                } catch (InterruptedException ex) {
 +                  com.gemstone.gemfire.test.dunit.Assert.fail("Why was I interrupted?", ex);
 +                }
 +              }
 +            });
 +          flushIfNecessary(region);
 +        }
 +      });
 +    AsyncInvocation ai1 =
 +      vm1.invokeAsync(new CacheSerializableRunnable("Verify") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            LinkedBlockingQueue queue =
 +              (LinkedBlockingQueue) region.getUserAttribute();
 +            for (int i = 0; i <= lastNumber; i++) {
 +              try {
 +                com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("++ Waiting for " + i);
 +                Integer value = (Integer) queue.take();
 +                com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("++ Got " + value);
 +                assertEquals(i, value.intValue());
 +
 +              } catch (InterruptedException ex) {
 +                com.gemstone.gemfire.test.dunit.Assert.fail("Why was I interrupted?", ex);
 +              }
 +            }
 +          }
 +        });
 +
 +    AsyncInvocation ai0 =
 +      vm0.invokeAsync(new CacheSerializableRunnable("Populate") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            for (int i = 0; i <= lastNumber; i++) {
 +//              com.gemstone.gemfire.internal.GemFireVersion.waitForJavaDebugger(getLogWriter());
 +              region.put(key, new Integer(i));
 +            }
 +          }
 +        });
 +
 +    ThreadUtils.join(ai0, 30 * 1000);
 +    ThreadUtils.join(ai1, 30 * 1000);
 +
 +    if (ai0.exceptionOccurred()) {
 +      com.gemstone.gemfire.test.dunit.Assert.fail("ai0 failed", ai0.getException());
 +
 +    } else if (ai1.exceptionOccurred()) {
 +      com.gemstone.gemfire.test.dunit.Assert.fail("ai1 failed", ai1.getException());
 +    }
 +  }
 +
 +  /**
 +   * Tests that doing a distributed get results in a
 +   * <code>netSearch</code>.
 +   */
 +  public void testDistributedGet() {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Populate region") {
 +        public void run2() throws CacheException {
 +          Region region = createRegion(name);
 +          region.put(key, value);
 +        }
 +      });
 +
 +    SerializableRunnable get = new CacheSerializableRunnable("Distributed get") {
 +        public void run2() throws CacheException {
 +          Region region = createRegion(name);
 +          assertEquals(value, region.get(key));
 +        }
 +    };
 +
 +
 +    vm1.invoke(get);
 +  }
 +
 +  /**
 +   * Tests that doing a {@link Region#put put} on a distributed region
 +   * in one VM does not effect a region in a different VM that does
 +   * not have that key defined.
 +   */
 +  public void testDistributedPutNoUpdate()
 +    throws InterruptedException {
 +
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    Thread.sleep(250);
 +
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    vm0.invoke(new CacheSerializableRunnable("Put key/value") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.put(key, value);
 +        }
 +      });
 +
 +    Thread.sleep(250);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify no update") {
 +        public void run2() throws CacheException {
 +          Region region = getRootRegion().getSubregion(name);
 +          Region.Entry entry = region.getEntry(key);
 +          if (getRegionAttributes().getDataPolicy().withReplication() ||
 +              getRegionAttributes().getPartitionAttributes() != null) {
 +            assertEquals(value, region.get(key));
 +          }
 +          else {
 +            assertNull(entry);
 +          }
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Two VMs create a region. One populates a region entry.  The other
 +   * VM defines that entry.  The first VM updates the entry.  The
 +   * second VM should see the updated value.
 +   */
 +  public void testDefinedEntryUpdated() {
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object oldValue = "OLD_VALUE";
 +    final Object newValue = "NEW_VALUE";
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Create and populate") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.put(key, oldValue);
 +        }
 +      });
 +    vm1.invoke(new CacheSerializableRunnable("Define entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          if (!getRegionAttributes().getDataPolicy().withReplication())
 +            region.create(key, null);
 +        }
 +      });
 +    vm0.invoke(new CacheSerializableRunnable("Update entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.put(key, newValue);
 +        }
 +      });
 +    Invoke.invokeRepeatingIfNecessary(vm1, new CacheSerializableRunnable("Get entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          assertEquals(newValue, region.get(key));
 +        }
 +      }, getRepeatTimeoutMs());
 +  }
 +
 +
 +  /**
 +   * Tests that {@linkplain Region#destroy destroying} an entry is
 +   * propagated to all VMs that define that entry.
 +   */
 +  public void testDistributedDestroy() throws InterruptedException {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +//DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
 +            Region region = createRegion(name);
 +            assertTrue(!region.isDestroyed());
 +            Region root = region.getParentRegion();
 +            assertTrue(!root.isDestroyed());
 +          }
 +        };
 +
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +    vm2.invoke(create);
 +
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    SerializableRunnable put =
 +      new CacheSerializableRunnable("Put key/value") {
 +          public void run2() throws CacheException {
 +//DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to put");
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            region.put(key, value);
 +            assertTrue(!region.isDestroyed());
 +            assertTrue(!region.getParentRegion().isDestroyed());
 +            flushIfNecessary(region);
 +          }
 +        };
 +
 +    vm0.invoke(put);
 +    vm1.invoke(put);
 +    vm2.invoke(put);
 +
 +    SerializableRunnable verifyPut =
 +      new CacheSerializableRunnable("Verify Put") {
 +          public void run2() throws CacheException {
 +            Region root = getRootRegion();
 +            assertTrue(!root.isDestroyed());
 +            Region region = root.getSubregion(name);
 +            assertTrue(!region.isDestroyed());
 +//DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to get");
 +            assertEquals(value, region.getEntry(key).getValue());
 +          }
 +        };
 +
 +    vm0.invoke(verifyPut);
 +    vm1.invoke(verifyPut);
 +    vm2.invoke(verifyPut);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy Entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.destroy(key);
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    CacheSerializableRunnable verifyDestroy =
 +      new CacheSerializableRunnable("Verify entry destruction") {
 +          public void run2() throws CacheException {
 +            Region root = getRootRegion();
 +            assertTrue(!root.isDestroyed());
 +            Region region = root.getSubregion(name);
 +            assertTrue(!region.isDestroyed());
 +            assertNull(region.getEntry(key));
 +          }
 +        };
 +    vm0.invoke(verifyDestroy);
 +    vm1.invoke(verifyDestroy);
 +    vm2.invoke(verifyDestroy);
 +  }
 +
 +  /**
 +   * Tests that {@linkplain Region#destroy destroying} a region is
 +   * propagated to all VMs that define that region.
 +   */
 +  public void testDistributedRegionDestroy()
 +    throws InterruptedException {
 +
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Invoke.invokeInEveryVM(create);
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy Region") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.destroyRegion();
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    Invoke.invokeInEveryVM(new CacheSerializableRunnable("Verify region destruction") {
 +      public void run2() throws CacheException {
 +        WaitCriterion ev = new WaitCriterion() {
 +          public boolean done() {
 +            return getRootRegion().getSubregion(name) == null;
 +          }
 +          public String description() {
 +            return "Waiting for region " + name + " to be destroyed";
 +          }
 +        };
 +        Wait.waitForCriterion(ev, 60 * 1000, 10, true);
 +        Region region = getRootRegion().getSubregion(name);
 +        assertNull(region);
 +      }
 +    });
 +  }
 +
 +  /**
 +   * Tests that a {@linkplain Region#localDestroy} does not effect
 +   * other VMs that define that entry.
 +   */
 +  public void testLocalDestroy() throws InterruptedException {
 +    if (!supportsLocalDestroyAndLocalInvalidate()) {
 +      return;
 +    }
 +    // test not valid for persistBackup region since they have to be
 +    // mirrored KEYS_VALUES
 +    if (getRegionAttributes().getDataPolicy().withPersistence()) return;
 +
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    Thread.sleep(250);
 +
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    SerializableRunnable put =
 +      new CacheSerializableRunnable("Put key/value") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            region.put(key, value);
 +          }
 +        };
 +
 +    vm0.invoke(put);
 +    vm1.invoke(put);
 +
 +    Thread.sleep(250);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Local Destroy Entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.localDestroy(key);
 +        }
 +      });
 +
 +    Thread.sleep(250);
 +
 +    SerializableRunnable verify =
 +      new CacheSerializableRunnable("Verify entry existence") {
 +          public void run2() throws CacheException {
 +            Region region = getRootRegion().getSubregion(name);
 +            assertNotNull(region.getEntry(key));
 +          }
 +        };
 +    vm1.invoke(verify);
 +  }
 +
 +  /**
 +   * Tests that a {@link Region#localDestroyRegion} is not propagated
 +   * to other VMs that define that region.
 +   */
 +  public void testLocalRegionDestroy()
 +    throws InterruptedException {
 +
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    Thread.sleep(250);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Local Destroy Region") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.localDestroyRegion();
 +        }
 +      });
 +
 +    Thread.sleep(250);
 +
 +    SerializableRunnable verify =
 +      new CacheSerializableRunnable("Verify region existence") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            assertNotNull(region);
 +          }
 +        };
 +    vm1.invoke(verify);
 +  }
 +
 +  /**
 +   * Tests that {@linkplain Region#invalidate invalidating} an entry is
 +   * propagated to all VMs that define that entry.
 +   */
 +  public void testDistributedInvalidate() {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +    VM vm2 = host.getVM(2);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    // vm2 is on a different gemfire system
 +    vm2.invoke(create);
 +
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    SerializableRunnable put =
 +      new CacheSerializableRunnable("Put key/value") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            region.put(key, value);
 +            flushIfNecessary(region);
 +          }
 +        };
 +
 +    vm0.invoke(put);
 +    vm1.invoke(put);
 +    vm2.invoke(put);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Invalidate Entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.invalidate(key);
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    CacheSerializableRunnable verify =
 +      new CacheSerializableRunnable("Verify entry invalidation") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            Region.Entry entry = region.getEntry(key);
 +            assertNotNull(entry);
 +            if (entry.getValue() != null) {
 +              // changed from severe to fine because it is possible
 +              // for this to return non-null on d-no-ack
 +              // that is was invokeRepeatingIfNecessary is called
 +              com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().fine("invalidated entry has value of " + entry.getValue());
 +            }
 +            assertNull(entry.getValue());
 +          }
 +        };
 +
 +
 +    vm1.invoke(verify);
 +    vm2.invoke(verify);
 +  }
 +
 +  /**
 +   * Tests that {@linkplain Region#invalidate invalidating} an entry
 +   * in multiple VMs does not cause any problems.
 +   */
 +  public void testDistributedInvalidate4() throws InterruptedException {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    int vmCount = host.getVMCount();
 +    for (int i = 0; i < vmCount; i++) {
 +      VM vm = host.getVM(i);
 +      vm.invoke(create);
 +    }
 +
 +    SerializableRunnable put =
 +        new CacheSerializableRunnable("put entry") {
 +            public void run2() throws CacheException {
 +              Region region =
 +                  getRootRegion().getSubregion(name);
 +              region.put(key, value);
 +              flushIfNecessary(region);
 +            }
 +          };
 +
 +      for (int i = 0; i < vmCount; i++) {
 +        VM vm = host.getVM(i);
 +        vm.invoke(put);
 +      }
 +
 +      SerializableRunnable invalidate =
 +      new CacheSerializableRunnable("Invalidate Entry") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.invalidate(key);
 +          flushIfNecessary(region);
 +        }
 +      };
 +
 +    for (int i = 0; i < vmCount; i++) {
 +      VM vm = host.getVM(i);
 +      vm.invoke(invalidate);
 +    }
 +
 +    SerializableRunnable verify =
 +      new CacheSerializableRunnable("Verify entry invalidation") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            Region.Entry entry = region.getEntry(key);
 +            assertNotNull(entry);
 +            assertNull(entry.getValue());
 +          }
 +        };
 +
 +    for (int i = 0; i < vmCount; i++) {
 +      VM vm = host.getVM(i);
 +      vm.invoke(verify);
 +    }
 +  }
 +
 +  /**
 +   * Tests that {@linkplain Region#invalidateRegion invalidating} a
 +   * region is propagated to all VMs that define that entry.
 +   */
 +  public void testDistributedRegionInvalidate()
 +    throws InterruptedException {
 +    if (!supportsSubregions()) {
 +      return;
 +    }
 +    final String name = this.getUniqueName();
 +    final String subname = "sub";
 +    final boolean useSubs = getRegionAttributes().getPartitionAttributes() == null;
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            Region region;
 +            region = createRegion(name);
 +            if (useSubs) {
 +              region.createSubregion(subname, region.getAttributes());
 +            }
 +          }
 +        };
 +
 +    Invoke.invokeInEveryVM(create);
 +
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +    final Object key2 = "KEY2";
 +    final Object value2 = "VALUE2";
 +
 +    SerializableRunnable put =
 +      new CacheSerializableRunnable("Put key/value") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            region.put(key, value);
 +            region.put(key2, value2);
 +            flushIfNecessary(region);
 +
 +            if (useSubs) {
 +              Region subregion = region.getSubregion(subname);
 +              subregion.put(key, value);
 +              subregion.put(key2, value2);
 +              flushIfNecessary(subregion);
 +            }
 +          }
 +        };
 +
 +    Invoke.invokeInEveryVM(put);
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Invalidate Region") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.invalidateRegion();
 +        }
 +      });
 +
 +    CacheSerializableRunnable verify =
 +      new CacheSerializableRunnable("Verify region invalidation") {
 +          public void run2() throws CacheException {
 +            Region region =
 +              getRootRegion().getSubregion(name);
 +            {
 +              Region.Entry entry = region.getEntry(key);
 +              assertNotNull(entry);
 +              Object v = entry.getValue();
 +              assertNull("Expected null but was " + v, v);
 +
 +              entry = region.getEntry(key2);
 +              assertNotNull(entry);
 +              assertNull(entry.getValue());
 +            }
 +
 +            if (useSubs) {
 +              Region subregion = region.getSubregion(subname);
 +              Region.Entry entry = subregion.getEntry(key);
 +              assertNotNull(entry);
 +              assertNull(entry.getValue());
 +
 +              entry = subregion.getEntry(key2);
 +              assertNotNull(entry);
 +              assertNull(entry.getValue());
 +            }
 +          }
 +        };
 +
 +    Invoke.invokeInEveryVMRepeatingIfNecessary(verify, getRepeatTimeoutMs());
 +  }
 +
 +  /**
 +   * Tests that a {@link CacheListener} is invoked in a remote VM.
 +   */
 +  public void testRemoteCacheListener() throws InterruptedException {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object oldValue = "OLD_VALUE";
 +    final Object newValue = "NEW_VALUE";
 +//    final Object key2 = "KEY2";
 +//    final Object value2 = "VALUE2";
 +
 +    SerializableRunnable populate =
 +      new CacheSerializableRunnable("Create Region and Put") {
 +          public void run2() throws CacheException {
 +            Region region = createRegion(name);
 +            region.put(key, oldValue);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    final VM vm0 = host.getVM(0);
 +    final VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(populate);
 +    vm1.invoke(populate);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set listener") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterUpdate2(EntryEvent event) {
 +                assertEquals(Operation.UPDATE, event.getOperation());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +                assertEquals(key, event.getKey());
 +                assertEquals(oldValue, event.getOldValue());
 +                assertEquals(newValue, event.getNewValue());
 +                assertFalse(event.getOperation().isLoad());
 +                assertFalse(event.getOperation().isLocalLoad());
 +                assertFalse(event.getOperation().isNetLoad());
 +                assertFalse(event.getOperation().isNetSearch());
 +                if (event.getRegion().getAttributes().getOffHeap()) {
 +                  // since off heap always serializes the old value is serialized and available
 +                  assertEquals(oldValue, event.getSerializedOldValue().getDeserializedValue());
 +                } else {
 +                  assertEquals(null, event.getSerializedOldValue()); // since it was put originally in this VM
 +                }
 +                DataInputStream dis = new DataInputStream(new ByteArrayInputStream(event.getSerializedNewValue().getSerializedValue()));
 +                try {
 +                  assertEquals(newValue, DataSerializer.readObject(dis));
 +                } catch (Exception e) {
 +                  com.gemstone.gemfire.test.dunit.Assert.fail("Unexpected Exception", e);
 +                }
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    // I see no reason to pause here.
 +    // The test used to pause here but only if no-ack.
 +    // But we have no operations to wait for.
 +    // The last thing we did was install a listener in vm1
 +    // and it is possible that vm0 does not yet know we have
 +    // a listener but for this test it does not matter.
 +    // So I'm commenting out the following pause:
 +    //pauseIfNecessary();
 +    // If needed then do a flushIfNecessary(region) after adding the cache listener
 +
 +    vm0.invoke(new CacheSerializableRunnable("Update") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.put(key, newValue, getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Update") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +
 +          // Setup listener for next test
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterInvalidate2(EntryEvent event) {
 +                assertEquals(Operation.INVALIDATE, event.getOperation());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +                assertEquals(key, event.getKey());
 +                assertEquals(newValue, event.getOldValue());
 +                assertNull(event.getNewValue());
 +                assertFalse(event.getOperation().isLoad());
 +                assertFalse(event.getOperation().isLocalLoad());
 +                assertFalse(event.getOperation().isNetLoad());
 +                assertFalse(event.getOperation().isNetSearch());
 +                assertNull(event.getSerializedNewValue());
 +                DataInputStream dis = new DataInputStream(new ByteArrayInputStream(event.getSerializedOldValue().getSerializedValue()));
 +                try {
 +                  assertEquals(newValue, DataSerializer.readObject(dis));
 +                } catch (Exception e) {
 +                  com.gemstone.gemfire.test.dunit.Assert.fail("Unexpected Exception", e);
 +                }
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Invalidate") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.invalidate(key, getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Invalidate") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +
 +          // Setup listener for next test
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterDestroy2(EntryEvent event) {
 +                assertTrue(event.getOperation().isDestroy());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +                assertEquals(key, event.getKey());
 +                assertNull(event.getOldValue());
 +                assertNull(event.getNewValue());
 +                assertFalse(event.getOperation().isLoad());
 +                assertFalse(event.getOperation().isLocalLoad());
 +                assertFalse(event.getOperation().isNetLoad());
 +                assertFalse(event.getOperation().isNetSearch());
 +                assertNull(event.getSerializedOldValue());
 +                assertNull(event.getSerializedNewValue());
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.destroy(key, getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Destroy") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +
 +          // Setup listener for next test
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterRegionInvalidate2(RegionEvent event) {
 +                assertEquals(Operation.REGION_INVALIDATE, event.getOperation());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Invalidate Region") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.invalidateRegion(getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Invalidate Region") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +
 +          // Setup listener for next test
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterRegionDestroy2(RegionEvent event) {
 +                assertEquals(Operation.REGION_DESTROY, event.getOperation());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy Region") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.destroyRegion(getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Destroy Region") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +        }
 +      });
 +  }
 +
 +
 +  /**
 +   * Tests that a {@link CacheListener} is invoked in a remote VM.
 +   */
 +  public void testRemoteCacheListenerInSubregion() throws InterruptedException {
 +    if (!supportsSubregions()) {
 +      return;
 +    }
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    final VM vm0 = host.getVM(0);
 +    final VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Create Root") {
 +      public void run2() throws CacheException {
 +        createRootRegion();
 +      }
 +    });
 +
 +    vm1.invoke(create);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set listener") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterRegionInvalidate2(RegionEvent event) {
 +                assertEquals(Operation.REGION_INVALIDATE, event.getOperation());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Invalidate Root Region") {
 +        public void run2() throws CacheException {
 +          getRootRegion().invalidateRegion(getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Invalidate Region") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +
 +          // Setup listener for next test
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          listener = new TestCacheListener() {
 +              public void afterRegionDestroy2(RegionEvent event) {
 +                assertEquals(Operation.REGION_DESTROY, event.getOperation());
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(event.getCallbackArgument(), event.getDistributedMember());
 +              }
 +            };
 +          region.getAttributesMutator().addCacheListener(listener);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy Root Region") {
 +        public void run2() throws CacheException {
 +          getRootRegion().destroyRegion(getSystem().getDistributedMember());
 +        }
 +      });
 +
 +    vm1.invoke(new CacheSerializableRunnable("Verify Destroy Region") {
 +        public void run2() throws CacheException {
 +          listener.waitForInvocation(3000, 10);
 +        }
 +      });
 +  }
 +
 +
 +  /**
 +   * Indicate whether this region supports netload
 +   * @return true if it supports netload
 +   */
 +  protected boolean supportsNetLoad() {
 +    return true;
 +  }
 +  
 +  /**
 +   * Tests that a {@link CacheLoader} is invoked in a remote VM.  This
 +   * essentially tests <code>netLoad</code>.
 +   */
 +  public void testRemoteCacheLoader() throws InterruptedException {
 +    if (!supportsNetLoad()) {
 +      return;
 +    }
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set CacheLoader") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          loader = new TestCacheLoader() {
 +              public Object load2(LoaderHelper helper)
 +                throws CacheLoaderException {
 +                assertEquals(region, helper.getRegion());
 +                assertEquals(key, helper.getKey());
 +                assertNull(helper.getArgument());
 +
 +                return value;
 +              }
 +            };
 +          region.getAttributesMutator().setCacheLoader(loader);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Remote load") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          assertEquals(value, region.get(key));
 +        }
 +      });
 +
 +    vm1.invoke(new SerializableRunnable("Verify loader") {
 +        public void run() {
 +          assertTrue(loader.wasInvoked());
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Tests that the parameter passed to a remote {@link CacheLoader}
 +   * is actually passed.
 +   */
 +  public void testRemoteCacheLoaderArg() throws InterruptedException {
 +    if (!supportsNetLoad()) {
 +      return;
 +    }
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +    final String arg = "ARG";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +            // Can't test non-Serializable callback argument here
 +            // because netLoad will not occur because there are no
 +            // other members with the region defined when it is
 +            // created.  Hooray for intelligent messaging.
 +          }
 +        };
 +
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set CacheLoader") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          loader = new TestCacheLoader() {
 +              public Object load2(LoaderHelper helper)
 +                throws CacheLoaderException {
 +                assertEquals(region, helper.getRegion());
 +                assertEquals(key, helper.getKey());
 +                assertEquals(arg, helper.getArgument());
 +
 +                return value;
 +              }
 +            };
 +          region.getAttributesMutator().setCacheLoader(loader);
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Remote load") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +
 +          try {
 +            // Use a non-serializable arg object
 +            region.get(key, new Object() { });
 +            fail("Should have thrown an IllegalArgumentException");
 +
 +          } catch (IllegalArgumentException ex) {
 +            // pass...
 +          }
 +          assertNull(region.getEntry(key));
 +          try {
 +           assertEquals(value, region.get(key, arg));
 +          }
 +          catch(IllegalArgumentException e) {}
 +       }
 +      });
 +
 +    vm1.invoke(new SerializableRunnable("Verify loader") {
 +        public void run() {
 +          assertTrue(loader.wasInvoked());
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Tests that a remote {@link CacheLoader} that throws a {@link
 +   * CacheLoaderException} results is propagated back to the caller.
 +   */
 +  public void testRemoteCacheLoaderException() throws InterruptedException {
 +    if (!supportsNetLoad()) {
 +      return;
 +    }
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +//    final Object value = "VALUE";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set CacheLoader") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          loader = new TestCacheLoader() {
 +              public Object load2(LoaderHelper helper)
 +                throws CacheLoaderException {
 +                assertEquals(region, helper.getRegion());
 +                assertEquals(key, helper.getKey());
 +                assertNull(helper.getArgument());
 +
 +                String s = "Test Exception";
 +                throw new CacheLoaderException(s);
 +              }
 +            };
 +          region.getAttributesMutator().setCacheLoader(loader);
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Remote load") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          try {
 +            region.get(key);
 +            fail("Should have thrown a CacheLoaderException");
 +
 +          } catch (CacheLoaderException ex) {
 +            // pass...
 +          }
 +        }
 +      });
 +
 +    vm1.invoke(new SerializableRunnable("Verify loader") {
 +        public void run() {
 +          assertTrue(loader.wasInvoked());
 +        }
 +      });
 +  }
 +
 +
 +  public void testCacheLoaderWithNetSearch() throws CacheException {
 +    if (!supportsNetLoad()) {
 +      return;
 +    }
 +    // some tests use mirroring by default (e.g. persistBackup regions)
 +    // if so, then this test won't work right
 +    if (getRegionAttributes().getDataPolicy().withReplication()
 +        || getRegionAttributes().getDataPolicy().isPreloaded()) {
 +      return;
 +    }
 +
 +    final String name = this.getUniqueName();
 +    final Object key = this.getUniqueName();
 +    final Object value = new Integer(42);
 +
 +    Host host = Host.getHost(0);
 +    // use vm on other gemfire system
 +    VM vm1 = host.getVM(1);
 +    vm1.invoke(new CacheSerializableRunnable("set remote value") {
 +      public void run2() throws CacheException {
 +//        final TestCacheLoader remoteloader = new TestCacheLoader() {
 +//            public Object load2(LoaderHelper helper)
 +//              throws CacheLoaderException {
 +//
 +//              assertEquals(key, helper.getKey());
 +//              assertEquals(name, helper.getRegion().getName());
 +//              return value;
 +//            }
 +//          };
 +//
 +//        AttributesFactory factory =
 +//          new AttributesFactory(getRegionAttributes());
 +//        factory.setCacheLoader(remoteloader);
 +        Region rgn = createRegion(name);
 +        rgn.put(key, value);
 +        flushIfNecessary(rgn);
 +      }
 +    });
 +
 +    final TestCacheLoader loader1 = new TestCacheLoader() {
 +        public Object load2(LoaderHelper helper)
 +          throws CacheLoaderException {
 +
 +          assertEquals(key, helper.getKey());
 +          assertEquals(name, helper.getRegion().getName());
 +
 +          try {
 +            helper.getRegion().getAttributes();
 +            Object result = helper.netSearch(false);
 +            assertEquals(value, result);
 +            return result;
 +          } catch (TimeoutException ex) {
 +            com.gemstone.gemfire.test.dunit.Assert.fail("Why did I time out?", ex);
 +          }
 +          return null;
 +        }
 +      };
 +
 +    AttributesFactory f = new AttributesFactory(getRegionAttributes());
 +    f.setCacheLoader(loader1);
 +    Region region =
 +      createRegion(name, f.create());
 +
 +    loader1.wasInvoked();
 +
 +    Region.Entry entry = region.getEntry(key);
 +    assertNull(entry);
 +    region.create(key, null);
 +
 +    entry = region.getEntry(key);
 +    assertNotNull(entry);
 +    assertNull(entry.getValue());
 +
 +    // make sure value is still there in vm1
 +    vm1.invoke(new CacheSerializableRunnable("verify remote value") {
 +      public void run2() throws CacheException {
 +        Region rgn = getRootRegion().getSubregion(name);
 +        assertEquals(value, rgn.getEntry(key).getValue());
 +      }
 +    });
 +
 +//    com.gemstone.gemfire.internal.util.DebuggerSupport.waitForJavaDebugger(getLogWriter());
 +    assertEquals(value, region.get(key));
 +    // if global scope, then a netSearch is done BEFORE the loader is invoked,
 +    // so we get the value but the loader is never invoked.
 +    if (region.getAttributes().getScope().isGlobal()) {
 +      assertTrue(!loader1.wasInvoked());
 +    }
 +    else {
 +      assertTrue(loader1.wasInvoked());
 +    }
 +    assertEquals(value, region.getEntry(key).getValue());
 +  }
 +
 +
 +  public void testCacheLoaderWithNetLoad() throws CacheException {
 +
 +
 +    // replicated regions and partitioned regions make no sense for this
 +    // test
 +    if (getRegionAttributes().getDataPolicy().withReplication() ||
 +        getRegionAttributes().getDataPolicy().isPreloaded() ||
 +        getRegionAttributes().getPartitionAttributes() != null)
 +    {
 +      return;
 +    }
 +
 +    final String name = this.getUniqueName();
 +    final Object key = this.getUniqueName();
 +    final Object value = new Integer(42);
 +
 +    Host host = Host.getHost(0);
 +    // use vm on other gemfire system
 +    VM vm1 = host.getVM(1);
 +    vm1.invoke(new CacheSerializableRunnable("set up remote loader") {
 +      public void run2() throws CacheException {
 +        final TestCacheLoader remoteloader = new TestCacheLoader() {
 +            public Object load2(LoaderHelper helper)
 +              throws CacheLoaderException {
 +
 +              assertEquals(key, helper.getKey());
 +              assertEquals(name, helper.getRegion().getName());
 +              return value;
 +            }
 +          };
 +
 +        AttributesFactory factory =
 +          new AttributesFactory(getRegionAttributes());
 +        factory.setCacheLoader(remoteloader);
 +        createRegion(name, factory.create());
 +      }
 +    });
 +
 +    final TestCacheLoader loader1 = new TestCacheLoader() {
 +        public Object load2(LoaderHelper helper)
 +          throws CacheLoaderException {
 +
 +          assertEquals(key, helper.getKey());
 +          assertEquals(name, helper.getRegion().getName());
 +
 +          try {
 +            helper.getRegion().getAttributes();
 +            Object result = helper.netSearch(true);
 +            assertEquals(value, result);
 +            return result;
 +          } catch (TimeoutException ex) {
 +            com.gemstone.gemfire.test.dunit.Assert.fail("Why did I time out?", ex);
 +          }
 +          return null;
 +        }
 +      };
 +
 +    AttributesFactory f = new AttributesFactory(getRegionAttributes());
 +    f.setCacheLoader(loader1);
 +    Region region = createRegion(name, f.create());
 +
 +    loader1.wasInvoked();
 +
 +    Region.Entry entry = region.getEntry(key);
 +    assertNull(entry);
 +
 +    region.create(key, null);
 +
 +    entry = region.getEntry(key);
 +    assertNotNull(entry);
 +    assertNull(entry.getValue());
 +
 +//    com.gemstone.gemfire.internal.util.DebuggerSupport.waitForJavaDebugger(getLogWriter());
 +    assertEquals(value, region.get(key));
 +
 +    assertTrue(loader1.wasInvoked());
 +    assertEquals(value, region.getEntry(key).getValue());
 +  }
 +
 +
 +  /**
 +   * Tests that {@link Region#get} returns <code>null</code> when
 +   * there is no remote loader.
 +   */
 +  public void testNoRemoteCacheLoader() throws InterruptedException {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +//    final Object value = "VALUE";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            createRegion(name);
 +          }
 +        };
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Remote load") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          assertNull(region.get(key));
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Tests that a remote <code>CacheLoader</code> is not invoked if
 +   * the remote region has an invalid entry (that is, a key, but no
 +   * value).
 +   */
 +  public void testNoLoaderWithInvalidEntry() {
 +    if (!supportsNetLoad()) {
 +      return;
 +    }
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object value = "VALUE";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            Region region = createRegion(name);
 +            loader = new TestCacheLoader() {
 +                public Object load2(LoaderHelper helper)
 +                  throws CacheLoaderException {
 +
 +                  return value;
 +                }
 +              };
 +            region.getAttributesMutator().setCacheLoader(loader);
 +          }
 +        };
 +
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    vm1.invoke(new CacheSerializableRunnable("Create invalid entry") {
 +      public void run2() throws CacheException {
 +        Region region =
 +          getRootRegion().getSubregion(name);
 +          region.create(key, null);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Remote get") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +// DebuggerSupport.waitForJavaDebugger(getLogWriter());
 +          assertEquals(value, region.get(key));
 +          assertTrue(loader.wasInvoked());
 +        }
 +      });
 +
 +    vm1.invoke(new SerializableRunnable("Verify loader") {
 +        public void run() {
 +          assertFalse(loader.wasInvoked());
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Tests that a remote {@link CacheWriter} is invoked and that
 +   * <code>CacheWriter</code> arguments and {@link
 +   * CacheWriterException}s are propagated appropriately.
 +   */
 +  public void testRemoteCacheWriter() throws InterruptedException {
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object oldValue = "OLD_VALUE";
 +    final Object newValue = "NEW_VALUE";
 +    final Object arg = "ARG";
 +    final Object exception = "EXCEPTION";
 +
 +    final Object key2 = "KEY2";
 +    final Object value2 = "VALUE2";
 +
 +    SerializableRunnable create =
 +      new CacheSerializableRunnable("Create Region") {
 +          public void run2() throws CacheException {
 +            Region region = createRegion(name);
 +
 +            // Put key2 in the region before any callbacks are
 +            // registered, so it can be destroyed later
 +            region.put(key2, value2);
 +            assertEquals(1, region.size());
 +            if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +              GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +              SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +              LocalRegion reRegion;
 +              reRegion = (LocalRegion) region;
 +              RegionEntry re = reRegion.getRegionEntry(key2);
 +              MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) re._getValue();
 +              assertEquals(1, mc.getRefCount());
 +              assertEquals(1, ma.getStats().getObjects());
 +            }
 +          }
 +        };
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(create);
 +    vm1.invoke(create);
 +
 +    ////////  Create
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set Writer") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          writer = new TestCacheWriter() {
 +              public void beforeCreate2(EntryEvent event)
 +                throws CacheWriterException {
 +
 +                if (exception.equals(event.getCallbackArgument())) {
 +                  String s = "Test Exception";
 +                  throw new CacheWriterException(s);
 +                }
 +
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isCreate());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(key, event.getKey());
 +                assertEquals(null, event.getOldValue());
 +                assertEquals(oldValue, event.getNewValue());
 +                assertFalse(event.getOperation().isLoad());
 +                assertFalse(event.getOperation().isLocalLoad());
 +                assertFalse(event.getOperation().isNetLoad());
 +                assertFalse(event.getOperation().isNetSearch());
 +
 +              }
 +            };
 +          region.getAttributesMutator().setCacheWriter(writer);
 +          flushIfNecessary(region);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Create with Exception") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          try {
 +            region.put(key, oldValue, exception);
 +            fail("Should have thrown a CacheWriterException");
 +
 +          } catch (CacheWriterException ex) {
 +            assertNull(region.getEntry(key));
 +            assertEquals(1, region.size());
 +            if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +              GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +              SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +              assertEquals(1, ma.getStats().getObjects());
 +            }
 +          }
 +        }
 +      });
 +
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Create with Argument") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.put(key, oldValue, arg);
 +          assertEquals(2, region.size());
 +          if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +            GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +            SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +            assertEquals(2, ma.getStats().getObjects());
 +            LocalRegion reRegion;
 +            reRegion = (LocalRegion) region;
 +            MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
 +            assertEquals(1, mc.getRefCount());
 +          }
 +        }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    ////////  Update
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set Writer") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          writer = new TestCacheWriter() {
 +              public void beforeUpdate2(EntryEvent event)
 +                throws CacheWriterException {
 +
 +                Object argument = event.getCallbackArgument();
 +                if (exception.equals(argument)) {
 +                  String s = "Test Exception";
 +                  throw new CacheWriterException(s);
 +                }
 +
 +                assertEquals(arg, argument);
 +
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isUpdate());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(key, event.getKey());
 +                assertEquals(oldValue, event.getOldValue());
 +                assertEquals(newValue, event.getNewValue());
 +                assertFalse(event.getOperation().isLoad());
 +                assertFalse(event.getOperation().isLocalLoad());
 +                assertFalse(event.getOperation().isNetLoad());
 +                assertFalse(event.getOperation().isNetSearch());
 +
 +              }
 +            };
 +          region.getAttributesMutator().setCacheWriter(writer);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Update with Exception") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          try {
 +            region.put(key, newValue, exception);
 +            fail("Should have thrown a CacheWriterException");
 +
 +          } catch (CacheWriterException ex) {
 +            Region.Entry entry = region.getEntry(key);
 +            assertEquals(oldValue, entry.getValue());
 +            assertEquals(2, region.size());
 +            if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +              GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +              SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +              assertEquals(2, ma.getStats().getObjects());
 +              LocalRegion reRegion;
 +              reRegion = (LocalRegion) region;
 +              MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
 +              assertEquals(1, mc.getRefCount());
 +            }
 +          }
 +        }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Update with Argument") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.put(key, newValue, arg);
 +          assertEquals(2, region.size());
 +          if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +            GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +            SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +            assertEquals(2, ma.getStats().getObjects());
 +          }
 +        }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    ////////  Destroy
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set Writer") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          writer = new TestCacheWriter() {
 +              public void beforeDestroy2(EntryEvent event)
 +                throws CacheWriterException {
 +
 +                Object argument = event.getCallbackArgument();
 +                if (exception.equals(argument)) {
 +                  String s = "Test Exception";
 +                  throw new CacheWriterException(s);
 +                }
 +
 +                assertEquals(arg, argument);
 +
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isDestroy());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +                assertEquals(key, event.getKey());
 +                assertEquals(newValue, event.getOldValue());
 +                assertNull(event.getNewValue());
 +                assertFalse(event.getOperation().isLoad());
 +                assertFalse(event.getOperation().isLocalLoad());
 +                assertFalse(event.getOperation().isNetLoad());
 +                assertFalse(event.getOperation().isNetSearch());
 +              }
 +            };
 +          region.getAttributesMutator().setCacheWriter(writer);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy with Exception") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          try {
 +            region.destroy(key, exception);
 +            fail("Should have thrown a CacheWriterException");
 +
 +          } catch (CacheWriterException ex) {
 +            assertNotNull(region.getEntry(key));
 +            assertEquals(2, region.size());
 +            if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +              GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +              SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +              assertEquals(2, ma.getStats().getObjects());
 +            }
 +          }
 +        }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy with Argument") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          region.destroy(key, arg);
 +          assertEquals(1, region.size());
 +          if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +            GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +            SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +            assertEquals(1, ma.getStats().getObjects());
 +          }
 +       }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    ////////  Region Destroy
 +
 +    vm1.invoke(new CacheSerializableRunnable("Set Writer") {
 +        public void run2() throws CacheException {
 +          final Region region =
 +            getRootRegion().getSubregion(name);
 +          writer = new TestCacheWriter() {
 +              public void beforeRegionDestroy2(RegionEvent event)
 +                throws CacheWriterException {
 +
 +                Object argument = event.getCallbackArgument();
 +                if (exception.equals(argument)) {
 +                  String s = "Test Exception";
 +                  throw new CacheWriterException(s);
 +                }
 +
 +                assertEquals(arg, argument);
 +
 +                assertEquals(region, event.getRegion());
 +                assertTrue(event.getOperation().isRegionDestroy());
 +                assertTrue(event.getOperation().isDistributed());
 +                assertFalse(event.getOperation().isExpiration());
 +                assertTrue(event.isOriginRemote());
 +              }
 +            };
 +          region.getAttributesMutator().setCacheWriter(writer);
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy with Exception") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          try {
 +            region.destroyRegion(exception);
 +            fail("Should have thrown a CacheWriterException");
 +
 +          } catch (CacheWriterException ex) {
 +            if (region.isDestroyed()) {
 +              com.gemstone.gemfire.test.dunit.Assert.fail("should not have an exception if region is destroyed", ex);
 +            }
 +            assertEquals(1, region.size());
 +            if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +              GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +              SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +              assertEquals(1, ma.getStats().getObjects());
 +            }
 +          }
 +        }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +
 +    vm0.invoke(new CacheSerializableRunnable("Destroy with Argument") {
 +        public void run2() throws CacheException {
 +          Region region =
 +            getRootRegion().getSubregion(name);
 +          assertEquals(1, region.size());
 +          if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +            GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +            SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +            assertEquals(1, ma.getStats().getObjects());
 +          }
 +          region.destroyRegion(arg);
 +          if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
 +            GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
 +            final SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
 +            WaitCriterion waitForStatChange = new WaitCriterion() {
 +              public boolean done() {
 +                return ma.getStats().getObjects() == 0;
 +              }
 +              public String description() {
 +                return "never saw off-heap object count go to zero. Last value was " + ma.getStats().getObjects();
 +              }
 +            };
 +            Wait.waitForCriterion(waitForStatChange, 3000, 10, true);
 +          }
 +        }
 +      });
 +    vm1.invoke(new SerializableRunnable("Verify callback") {
 +        public void run() {
 +          assertTrue(writer.wasInvoked());
 +        }
 +      });
 +  }
 +
 +  /**
 +   * Tests that, when given a choice, a local <code>CacheWriter</code>
 +   * is invoked instead of a remote one.
 +   */
 +  public void testLocalAndRemoteCacheWriters()
 +    throws InterruptedException {
 +
 +    assertTrue(getRegionAttributes().getScope().isDistributed());
 +
 +    final String name = this.getUniqueName();
 +    final Object key = "KEY";
 +    final Object oldValue = "OLD_VALUE";
 +    final Object newValue = "NEW_VALUE";
 +
 +    Host host = Host.getHost(0);
 +    VM vm0 = host.getVM(0);
 +    VM vm1 = host.getVM(1);
 +
 +    vm0.invoke(new CacheSerializableRunnable("Create \"Local\" Region") {
 +        public void run2() throws CacheException {
 +          Region region = createRegion(name);
 +          writer = new TestCacheWriter() {
 +              public void beforeUpdate2(EntryEvent event)
 +                throws CacheWriterException { }
 +
 +              p

<TRUNCATED>