You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:35:29 UTC
[07/83] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --cc geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index d3b065c,0000000..7a4d09e
mode 100644,000000..100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@@ -1,9177 -1,0 +1,9177 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.Instantiator;
+import com.gemstone.gemfire.InvalidDeltaException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.AttributesMutator;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheEvent;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.CacheWriter;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.LoaderHelper;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionEvent;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.TransactionEvent;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.TransactionListener;
+import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.InternalInstantiator;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.EntryExpiryTask;
+import com.gemstone.gemfire.internal.cache.ExpiryTask;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+import com.gemstone.gemfire.internal.cache.TXStateProxy;
+import com.gemstone.gemfire.internal.cache.Token;
+import com.gemstone.gemfire.internal.cache.TombstoneService;
+import com.gemstone.gemfire.internal.cache.delta.Delta;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionHolder;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VMRegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.RMIException;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.Invoke;
+
+
+/**
+ * Abstract superclass of {@link Region} tests that involve more than
+ * one VM.
+ */
+public abstract class MultiVMRegionTestCase extends RegionTestCase {
+
+ private static final Logger logger = LogService.getLogger();
+
+ Properties props = new Properties();
+
+ final int putRange_1Start = 1;
+
+ final int putRange_1End = 5;
+
+ final int putRange_2Start = 6;
+
+ final int putRange_2End = 10;
+
+ final int putRange_3Start = 11;
+
+ final int putRange_3End = 15;
+
+ final int putRange_4Start = 16;
+
+ final int putRange_4End = 20;
+
+ final int removeRange_1Start = 2;
+
+ final int removeRange_1End = 4;
+
+ final int removeRange_2Start = 7;
+
+ final int removeRange_2End = 9;
+
+ public MultiVMRegionTestCase(String name) {
+ super(name);
+ }
+
+ public static void caseTearDown() throws Exception {
+ disconnectAllFromDS();
+ }
+
+ @Override
+ protected final void postTearDownRegionTestCase() throws Exception {
+ DistributedTestCase.cleanupAllVms();
+ CCRegion = null;
+ }
+
+ // @todo can be used in tests
+// protected CacheSerializableRunnable createRegionTask(final String name) {
+// return new CacheSerializableRunnable("Create Region") {
+// public void run2() throws CacheException {
+// assertNotNull(createRegion(name));
+// }
+// };
+// }
+
+
+ //////// Test Methods
+
+ /**
+ * This is a for the ConcurrentMap operations.
+ * 4 VMs are used to
+ * create the region and operations are performed on one of the nodes
+ */
+ public void testConcurrentOperations() throws Exception {
+ SerializableRunnable createRegion = new CacheSerializableRunnable(
+ "createRegion") {
+
+ public void run2() throws CacheException {
+ Cache cache = getCache();
+ RegionAttributes regionAttribs = getRegionAttributes();
+ cache.createRegion("R1",
+ regionAttribs);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ // create the VM(0 - 4)
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+ vm0.invoke(createRegion);
+ vm1.invoke(createRegion);
+ vm2.invoke(createRegion);
+ vm3.invoke(createRegion);
+ concurrentMapTest("/R1");
+ }
+
+ /**
+ * Do putIfAbsent(), replace(Object, Object),
+ * replace(Object, Object, Object), remove(Object, Object) operations
+ */
+ public void concurrentMapTest(final String rName) {
+
+ //String exceptionStr = "";
+ VM vm0 = Host.getHost(0).getVM(0);
+ vm0.invoke(new CacheSerializableRunnable("doConcurrentMapOperations") {
+ public void run2() throws CacheException {
+ Cache cache = getCache();
+ final Region pr = cache.getRegion(rName);
+ assertNotNull(rName + " not created", pr);
+
+ // test successful putIfAbsent
+ for (int i = putRange_1Start; i <= putRange_1End; i++) {
+ Object putResult = pr.putIfAbsent(Integer.toString(i),
+ Integer.toString(i));
+ assertNull("Expected null, but got " + putResult + " for key " + i,
+ putResult);
+ }
+ int size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test unsuccessful putIfAbsent
+ for (int i = putRange_1Start; i <= putRange_1End; i++) {
+ Object putResult = pr.putIfAbsent(Integer.toString(i),
+ Integer.toString(i + 1));
+ assertEquals("for i=" + i, Integer.toString(i), putResult);
+ assertEquals("for i=" + i, Integer.toString(i), pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test successful replace(key, oldValue, newValue)
+ for (int i = putRange_1Start; i <= putRange_1End; i++) {
+ boolean replaceSucceeded = pr.replace(Integer.toString(i),
+ Integer.toString(i),
+ "replaced" + i);
+ assertTrue("for i=" + i, replaceSucceeded);
+ assertEquals("for i=" + i, "replaced" + i, pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test unsuccessful replace(key, oldValue, newValue)
+ for (int i = putRange_1Start; i <= putRange_2End; i++) {
+ boolean replaceSucceeded = pr.replace(Integer.toString(i),
+ Integer.toString(i), // wrong expected old value
+ "not" + i);
+ assertFalse("for i=" + i, replaceSucceeded);
+ assertEquals("for i=" + i,
+ i <= putRange_1End ? "replaced" + i : null,
+ pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test successful replace(key, value)
+ for (int i = putRange_1Start; i <= putRange_1End; i++) {
+ Object replaceResult = pr.replace(Integer.toString(i),
+ "twice replaced" + i);
+ assertEquals("for i=" + i, "replaced" + i, replaceResult);
+ assertEquals("for i=" + i,
+ "twice replaced" + i,
+ pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test unsuccessful replace(key, value)
+ for (int i = putRange_2Start; i <= putRange_2End; i++) {
+ Object replaceResult = pr.replace(Integer.toString(i),
+ "thrice replaced" + i);
+ assertNull("for i=" + i, replaceResult);
+ assertNull("for i=" + i, pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test unsuccessful remove(key, value)
+ for (int i = putRange_1Start; i <= putRange_2End; i++) {
+ boolean removeResult = pr.remove(Integer.toString(i),
+ Integer.toString(-i));
+ assertFalse("for i=" + i, removeResult);
+ assertEquals("for i=" + i,
+ i <= putRange_1End ? "twice replaced" + i : null,
+ pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", putRange_1End, size);
+ assertFalse("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+
+ // test successful remove(key, value)
+ for (int i = putRange_1Start; i <= putRange_1End; i++) {
+ boolean removeResult = pr.remove(Integer.toString(i),
+ "twice replaced" + i);
+ assertTrue("for i=" + i, removeResult);
+ assertEquals("for i=" + i, null, pr.get(Integer.toString(i)));
+ }
+ size = pr.size();
+ assertEquals("Size doesn't return expected value", 0, size);
+ assertTrue("isEmpty doesnt return proper state of the region",
+ pr.isEmpty());
+ }
+ });
+
+
+ /*
+ * destroy the Region.
+ */
+ vm0.invoke(new CacheSerializableRunnable("destroyRegionOp") {
+
+ public void run2() {
+ Cache cache = getCache();
+ Region pr = cache.getRegion(rName);
+ assertNotNull("Region already destroyed.", pr);
+ pr.destroyRegion();
+ assertTrue("Region isDestroyed false", pr.isDestroyed());
+ assertNull("Region not destroyed.", cache.getRegion(rName));
+ }
+ });
+ }
+
+ /**
+ * Tests that doing a {@link Region#put put} in a distributed region
+ * one VM updates the value in another VM.
+ */
+ public void testDistributedUpdate() {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ final Object key = "KEY";
+ final Object oldValue = "OLD_VALUE";
+ final Object newValue = "NEW_VALUE";
+
+ SerializableRunnable put =
+ new CacheSerializableRunnable("Put key/value") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, oldValue);
+ flushIfNecessary(region);
+ }
+ };
+
+ vm0.invoke(put);
+ vm1.invoke(put);
+
+ vm0.invoke(new CacheSerializableRunnable("Update") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.put(key, newValue);
+ flushIfNecessary(region);
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Validate update") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ Region.Entry entry = region.getEntry(key);
+ assertNotNull(entry);
+ assertEquals(newValue, entry.getValue());
+ }
+ });
+ }
+
+ /**
+ * Tests that distributed updates are delivered in order
+ *
+ * <P>
+ *
+ * Note that this test does not make sense for regions that are
+ * {@link Scope#DISTRIBUTED_NO_ACK} for which we do not guarantee
+ * the ordering of updates for a single producer/single consumer.
+ *
+ * DISABLED 4-16-04 - the current implementation assumes events
+ * are processed synchronously, which is no longer true.
+ */
+ public void _ttestOrderedUpdates() throws Throwable {
+ if (getRegionAttributes().getScope() ==
+ Scope.DISTRIBUTED_NO_ACK) {
+ return;
+ }
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final int lastNumber = 10;
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region entry") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+ region.create(key, null);
+ }
+ };
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Set listener") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.setUserAttribute(new LinkedBlockingQueue());
+ region.getAttributesMutator().addCacheListener(new
+ CacheListenerAdapter() {
+ public void afterUpdate(EntryEvent e) {
+ Region region2 = e.getRegion();
+ LinkedBlockingQueue queue =
+ (LinkedBlockingQueue) region2.getUserAttribute();
+ Object value = e.getNewValue();
+ assertNotNull(value);
+ try {
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("++ Adding " + value);
+ queue.put(value);
+
+ } catch (InterruptedException ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Why was I interrupted?", ex);
+ }
+ }
+ });
+ flushIfNecessary(region);
+ }
+ });
+ AsyncInvocation ai1 =
+ vm1.invokeAsync(new CacheSerializableRunnable("Verify") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ LinkedBlockingQueue queue =
+ (LinkedBlockingQueue) region.getUserAttribute();
+ for (int i = 0; i <= lastNumber; i++) {
+ try {
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("++ Waiting for " + i);
+ Integer value = (Integer) queue.take();
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("++ Got " + value);
+ assertEquals(i, value.intValue());
+
+ } catch (InterruptedException ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Why was I interrupted?", ex);
+ }
+ }
+ }
+ });
+
+ AsyncInvocation ai0 =
+ vm0.invokeAsync(new CacheSerializableRunnable("Populate") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ for (int i = 0; i <= lastNumber; i++) {
+// com.gemstone.gemfire.internal.GemFireVersion.waitForJavaDebugger(getLogWriter());
+ region.put(key, new Integer(i));
+ }
+ }
+ });
+
+ ThreadUtils.join(ai0, 30 * 1000);
+ ThreadUtils.join(ai1, 30 * 1000);
+
+ if (ai0.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("ai0 failed", ai0.getException());
+
+ } else if (ai1.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("ai1 failed", ai1.getException());
+ }
+ }
+
+ /**
+ * Tests that doing a distributed get results in a
+ * <code>netSearch</code>.
+ */
+ public void testDistributedGet() {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+ region.put(key, value);
+ }
+ });
+
+ SerializableRunnable get = new CacheSerializableRunnable("Distributed get") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+ assertEquals(value, region.get(key));
+ }
+ };
+
+
+ vm1.invoke(get);
+ }
+
+ /**
+ * Tests that doing a {@link Region#put put} on a distributed region
+ * in one VM does not effect a region in a different VM that does
+ * not have that key defined.
+ */
+ public void testDistributedPutNoUpdate()
+ throws InterruptedException {
+
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ Thread.sleep(250);
+
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ vm0.invoke(new CacheSerializableRunnable("Put key/value") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, value);
+ }
+ });
+
+ Thread.sleep(250);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify no update") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ Region.Entry entry = region.getEntry(key);
+ if (getRegionAttributes().getDataPolicy().withReplication() ||
+ getRegionAttributes().getPartitionAttributes() != null) {
+ assertEquals(value, region.get(key));
+ }
+ else {
+ assertNull(entry);
+ }
+ }
+ });
+ }
+
+ /**
+ * Two VMs create a region. One populates a region entry. The other
+ * VM defines that entry. The first VM updates the entry. The
+ * second VM should see the updated value.
+ */
+ public void testDefinedEntryUpdated() {
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object oldValue = "OLD_VALUE";
+ final Object newValue = "NEW_VALUE";
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ vm0.invoke(new CacheSerializableRunnable("Create and populate") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, oldValue);
+ }
+ });
+ vm1.invoke(new CacheSerializableRunnable("Define entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ if (!getRegionAttributes().getDataPolicy().withReplication())
+ region.create(key, null);
+ }
+ });
+ vm0.invoke(new CacheSerializableRunnable("Update entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, newValue);
+ }
+ });
+ Invoke.invokeRepeatingIfNecessary(vm1, new CacheSerializableRunnable("Get entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ assertEquals(newValue, region.get(key));
+ }
+ }, getRepeatTimeoutMs());
+ }
+
+
+ /**
+ * Tests that {@linkplain Region#destroy destroying} an entry is
+ * propagated to all VMs that define that entry.
+ */
+ public void testDistributedDestroy() throws InterruptedException {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+//DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
+ Region region = createRegion(name);
+ assertTrue(!region.isDestroyed());
+ Region root = region.getParentRegion();
+ assertTrue(!root.isDestroyed());
+ }
+ };
+
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+ vm2.invoke(create);
+
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ SerializableRunnable put =
+ new CacheSerializableRunnable("Put key/value") {
+ public void run2() throws CacheException {
+//DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to put");
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, value);
+ assertTrue(!region.isDestroyed());
+ assertTrue(!region.getParentRegion().isDestroyed());
+ flushIfNecessary(region);
+ }
+ };
+
+ vm0.invoke(put);
+ vm1.invoke(put);
+ vm2.invoke(put);
+
+ SerializableRunnable verifyPut =
+ new CacheSerializableRunnable("Verify Put") {
+ public void run2() throws CacheException {
+ Region root = getRootRegion();
+ assertTrue(!root.isDestroyed());
+ Region region = root.getSubregion(name);
+ assertTrue(!region.isDestroyed());
+//DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to get");
+ assertEquals(value, region.getEntry(key).getValue());
+ }
+ };
+
+ vm0.invoke(verifyPut);
+ vm1.invoke(verifyPut);
+ vm2.invoke(verifyPut);
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy Entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.destroy(key);
+ flushIfNecessary(region);
+ }
+ });
+
+ CacheSerializableRunnable verifyDestroy =
+ new CacheSerializableRunnable("Verify entry destruction") {
+ public void run2() throws CacheException {
+ Region root = getRootRegion();
+ assertTrue(!root.isDestroyed());
+ Region region = root.getSubregion(name);
+ assertTrue(!region.isDestroyed());
+ assertNull(region.getEntry(key));
+ }
+ };
+ vm0.invoke(verifyDestroy);
+ vm1.invoke(verifyDestroy);
+ vm2.invoke(verifyDestroy);
+ }
+
+ /**
+ * Tests that {@linkplain Region#destroy destroying} a region is
+ * propagated to all VMs that define that region.
+ */
+ public void testDistributedRegionDestroy()
+ throws InterruptedException {
+
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Invoke.invokeInEveryVM(create);
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy Region") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.destroyRegion();
+ flushIfNecessary(region);
+ }
+ });
+
+ Invoke.invokeInEveryVM(new CacheSerializableRunnable("Verify region destruction") {
+ public void run2() throws CacheException {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return getRootRegion().getSubregion(name) == null;
+ }
+ public String description() {
+ return "Waiting for region " + name + " to be destroyed";
+ }
+ };
+ Wait.waitForCriterion(ev, 60 * 1000, 10, true);
+ Region region = getRootRegion().getSubregion(name);
+ assertNull(region);
+ }
+ });
+ }
+
+ /**
+ * Tests that a {@linkplain Region#localDestroy} does not effect
+ * other VMs that define that entry.
+ */
+ public void testLocalDestroy() throws InterruptedException {
+ if (!supportsLocalDestroyAndLocalInvalidate()) {
+ return;
+ }
+ // test not valid for persistBackup region since they have to be
+ // mirrored KEYS_VALUES
+ if (getRegionAttributes().getDataPolicy().withPersistence()) return;
+
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ Thread.sleep(250);
+
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ SerializableRunnable put =
+ new CacheSerializableRunnable("Put key/value") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, value);
+ }
+ };
+
+ vm0.invoke(put);
+ vm1.invoke(put);
+
+ Thread.sleep(250);
+
+ vm0.invoke(new CacheSerializableRunnable("Local Destroy Entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.localDestroy(key);
+ }
+ });
+
+ Thread.sleep(250);
+
+ SerializableRunnable verify =
+ new CacheSerializableRunnable("Verify entry existence") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ assertNotNull(region.getEntry(key));
+ }
+ };
+ vm1.invoke(verify);
+ }
+
+ /**
+ * Tests that a {@link Region#localDestroyRegion} is not propagated
+ * to other VMs that define that region.
+ */
+ public void testLocalRegionDestroy()
+ throws InterruptedException {
+
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ Thread.sleep(250);
+
+ vm0.invoke(new CacheSerializableRunnable("Local Destroy Region") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ });
+
+ Thread.sleep(250);
+
+ SerializableRunnable verify =
+ new CacheSerializableRunnable("Verify region existence") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ assertNotNull(region);
+ }
+ };
+ vm1.invoke(verify);
+ }
+
+ /**
+ * Tests that {@linkplain Region#invalidate invalidating} an entry is
+ * propagated to all VMs that define that entry.
+ */
+ public void testDistributedInvalidate() {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ // vm2 is on a different gemfire system
+ vm2.invoke(create);
+
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ SerializableRunnable put =
+ new CacheSerializableRunnable("Put key/value") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, value);
+ flushIfNecessary(region);
+ }
+ };
+
+ vm0.invoke(put);
+ vm1.invoke(put);
+ vm2.invoke(put);
+
+ vm0.invoke(new CacheSerializableRunnable("Invalidate Entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.invalidate(key);
+ flushIfNecessary(region);
+ }
+ });
+
+ CacheSerializableRunnable verify =
+ new CacheSerializableRunnable("Verify entry invalidation") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ Region.Entry entry = region.getEntry(key);
+ assertNotNull(entry);
+ if (entry.getValue() != null) {
+ // changed from severe to fine because it is possible
+ // for this to return non-null on d-no-ack
+ // that is was invokeRepeatingIfNecessary is called
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().fine("invalidated entry has value of " + entry.getValue());
+ }
+ assertNull(entry.getValue());
+ }
+ };
+
+
+ vm1.invoke(verify);
+ vm2.invoke(verify);
+ }
+
+ /**
+ * Tests that {@linkplain Region#invalidate invalidating} an entry
+ * in multiple VMs does not cause any problems.
+ */
+ public void testDistributedInvalidate4() throws InterruptedException {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ int vmCount = host.getVMCount();
+ for (int i = 0; i < vmCount; i++) {
+ VM vm = host.getVM(i);
+ vm.invoke(create);
+ }
+
+ SerializableRunnable put =
+ new CacheSerializableRunnable("put entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, value);
+ flushIfNecessary(region);
+ }
+ };
+
+ for (int i = 0; i < vmCount; i++) {
+ VM vm = host.getVM(i);
+ vm.invoke(put);
+ }
+
+ SerializableRunnable invalidate =
+ new CacheSerializableRunnable("Invalidate Entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.invalidate(key);
+ flushIfNecessary(region);
+ }
+ };
+
+ for (int i = 0; i < vmCount; i++) {
+ VM vm = host.getVM(i);
+ vm.invoke(invalidate);
+ }
+
+ SerializableRunnable verify =
+ new CacheSerializableRunnable("Verify entry invalidation") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ Region.Entry entry = region.getEntry(key);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+ }
+ };
+
+ for (int i = 0; i < vmCount; i++) {
+ VM vm = host.getVM(i);
+ vm.invoke(verify);
+ }
+ }
+
+ /**
+ * Tests that {@linkplain Region#invalidateRegion invalidating} a
+ * region is propagated to all VMs that define that entry.
+ */
+ public void testDistributedRegionInvalidate()
+ throws InterruptedException {
+ if (!supportsSubregions()) {
+ return;
+ }
+ final String name = this.getUniqueName();
+ final String subname = "sub";
+ final boolean useSubs = getRegionAttributes().getPartitionAttributes() == null;
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ Region region;
+ region = createRegion(name);
+ if (useSubs) {
+ region.createSubregion(subname, region.getAttributes());
+ }
+ }
+ };
+
+ Invoke.invokeInEveryVM(create);
+
+ final Object key = "KEY";
+ final Object value = "VALUE";
+ final Object key2 = "KEY2";
+ final Object value2 = "VALUE2";
+
+ SerializableRunnable put =
+ new CacheSerializableRunnable("Put key/value") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, value);
+ region.put(key2, value2);
+ flushIfNecessary(region);
+
+ if (useSubs) {
+ Region subregion = region.getSubregion(subname);
+ subregion.put(key, value);
+ subregion.put(key2, value2);
+ flushIfNecessary(subregion);
+ }
+ }
+ };
+
+ Invoke.invokeInEveryVM(put);
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+
+ vm0.invoke(new CacheSerializableRunnable("Invalidate Region") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.invalidateRegion();
+ }
+ });
+
+ CacheSerializableRunnable verify =
+ new CacheSerializableRunnable("Verify region invalidation") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ {
+ Region.Entry entry = region.getEntry(key);
+ assertNotNull(entry);
+ Object v = entry.getValue();
+ assertNull("Expected null but was " + v, v);
+
+ entry = region.getEntry(key2);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+ }
+
+ if (useSubs) {
+ Region subregion = region.getSubregion(subname);
+ Region.Entry entry = subregion.getEntry(key);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+
+ entry = subregion.getEntry(key2);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+ }
+ }
+ };
+
+ Invoke.invokeInEveryVMRepeatingIfNecessary(verify, getRepeatTimeoutMs());
+ }
+
+ /**
+ * Tests that a {@link CacheListener} is invoked in a remote VM.
+ */
+ public void testRemoteCacheListener() throws InterruptedException {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object oldValue = "OLD_VALUE";
+ final Object newValue = "NEW_VALUE";
+// final Object key2 = "KEY2";
+// final Object value2 = "VALUE2";
+
+ SerializableRunnable populate =
+ new CacheSerializableRunnable("Create Region and Put") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+ region.put(key, oldValue);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+
+ vm0.invoke(populate);
+ vm1.invoke(populate);
+
+ vm1.invoke(new CacheSerializableRunnable("Set listener") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterUpdate2(EntryEvent event) {
+ assertEquals(Operation.UPDATE, event.getOperation());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ assertEquals(key, event.getKey());
+ assertEquals(oldValue, event.getOldValue());
+ assertEquals(newValue, event.getNewValue());
+ assertFalse(event.getOperation().isLoad());
+ assertFalse(event.getOperation().isLocalLoad());
+ assertFalse(event.getOperation().isNetLoad());
+ assertFalse(event.getOperation().isNetSearch());
+ if (event.getRegion().getAttributes().getOffHeap()) {
+ // since off heap always serializes the old value is serialized and available
+ assertEquals(oldValue, event.getSerializedOldValue().getDeserializedValue());
+ } else {
+ assertEquals(null, event.getSerializedOldValue()); // since it was put originally in this VM
+ }
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(event.getSerializedNewValue().getSerializedValue()));
+ try {
+ assertEquals(newValue, DataSerializer.readObject(dis));
+ } catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Unexpected Exception", e);
+ }
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ // I see no reason to pause here.
+ // The test used to pause here but only if no-ack.
+ // But we have no operations to wait for.
+ // The last thing we did was install a listener in vm1
+ // and it is possible that vm0 does not yet know we have
+ // a listener but for this test it does not matter.
+ // So I'm commenting out the following pause:
+ //pauseIfNecessary();
+ // If needed then do a flushIfNecessary(region) after adding the cache listener
+
+ vm0.invoke(new CacheSerializableRunnable("Update") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, newValue, getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Update") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+
+ // Setup listener for next test
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterInvalidate2(EntryEvent event) {
+ assertEquals(Operation.INVALIDATE, event.getOperation());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ assertEquals(key, event.getKey());
+ assertEquals(newValue, event.getOldValue());
+ assertNull(event.getNewValue());
+ assertFalse(event.getOperation().isLoad());
+ assertFalse(event.getOperation().isLocalLoad());
+ assertFalse(event.getOperation().isNetLoad());
+ assertFalse(event.getOperation().isNetSearch());
+ assertNull(event.getSerializedNewValue());
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(event.getSerializedOldValue().getSerializedValue()));
+ try {
+ assertEquals(newValue, DataSerializer.readObject(dis));
+ } catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Unexpected Exception", e);
+ }
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Invalidate") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.invalidate(key, getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Invalidate") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+
+ // Setup listener for next test
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterDestroy2(EntryEvent event) {
+ assertTrue(event.getOperation().isDestroy());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ assertEquals(key, event.getKey());
+ assertNull(event.getOldValue());
+ assertNull(event.getNewValue());
+ assertFalse(event.getOperation().isLoad());
+ assertFalse(event.getOperation().isLocalLoad());
+ assertFalse(event.getOperation().isNetLoad());
+ assertFalse(event.getOperation().isNetSearch());
+ assertNull(event.getSerializedOldValue());
+ assertNull(event.getSerializedNewValue());
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.destroy(key, getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Destroy") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+
+ // Setup listener for next test
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterRegionInvalidate2(RegionEvent event) {
+ assertEquals(Operation.REGION_INVALIDATE, event.getOperation());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Invalidate Region") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.invalidateRegion(getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Invalidate Region") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+
+ // Setup listener for next test
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterRegionDestroy2(RegionEvent event) {
+ assertEquals(Operation.REGION_DESTROY, event.getOperation());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy Region") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.destroyRegion(getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Destroy Region") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+ }
+ });
+ }
+
+
+ /**
+ * Tests that a {@link CacheListener} is invoked in a remote VM.
+ */
+ public void testRemoteCacheListenerInSubregion() throws InterruptedException {
+ if (!supportsSubregions()) {
+ return;
+ }
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Root") {
+ public void run2() throws CacheException {
+ createRootRegion();
+ }
+ });
+
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Set listener") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterRegionInvalidate2(RegionEvent event) {
+ assertEquals(Operation.REGION_INVALIDATE, event.getOperation());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Invalidate Root Region") {
+ public void run2() throws CacheException {
+ getRootRegion().invalidateRegion(getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Invalidate Region") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+
+ // Setup listener for next test
+ final Region region =
+ getRootRegion().getSubregion(name);
+ listener = new TestCacheListener() {
+ public void afterRegionDestroy2(RegionEvent event) {
+ assertEquals(Operation.REGION_DESTROY, event.getOperation());
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(event.getCallbackArgument(), event.getDistributedMember());
+ }
+ };
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy Root Region") {
+ public void run2() throws CacheException {
+ getRootRegion().destroyRegion(getSystem().getDistributedMember());
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify Destroy Region") {
+ public void run2() throws CacheException {
+ listener.waitForInvocation(3000, 10);
+ }
+ });
+ }
+
+
+ /**
+ * Indicate whether this region supports netload
+ * @return true if it supports netload
+ */
+ protected boolean supportsNetLoad() {
+ return true;
+ }
+
+ /**
+ * Tests that a {@link CacheLoader} is invoked in a remote VM. This
+ * essentially tests <code>netLoad</code>.
+ */
+ public void testRemoteCacheLoader() throws InterruptedException {
+ if (!supportsNetLoad()) {
+ return;
+ }
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+
+ vm1.invoke(new CacheSerializableRunnable("Set CacheLoader") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ loader = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+ assertEquals(region, helper.getRegion());
+ assertEquals(key, helper.getKey());
+ assertNull(helper.getArgument());
+
+ return value;
+ }
+ };
+ region.getAttributesMutator().setCacheLoader(loader);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Remote load") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ assertEquals(value, region.get(key));
+ }
+ });
+
+ vm1.invoke(new SerializableRunnable("Verify loader") {
+ public void run() {
+ assertTrue(loader.wasInvoked());
+ }
+ });
+ }
+
+ /**
+ * Tests that the parameter passed to a remote {@link CacheLoader}
+ * is actually passed.
+ */
+ public void testRemoteCacheLoaderArg() throws InterruptedException {
+ if (!supportsNetLoad()) {
+ return;
+ }
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object value = "VALUE";
+ final String arg = "ARG";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ // Can't test non-Serializable callback argument here
+ // because netLoad will not occur because there are no
+ // other members with the region defined when it is
+ // created. Hooray for intelligent messaging.
+ }
+ };
+
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Set CacheLoader") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ loader = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+ assertEquals(region, helper.getRegion());
+ assertEquals(key, helper.getKey());
+ assertEquals(arg, helper.getArgument());
+
+ return value;
+ }
+ };
+ region.getAttributesMutator().setCacheLoader(loader);
+ flushIfNecessary(region);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Remote load") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+
+ try {
+ // Use a non-serializable arg object
+ region.get(key, new Object() { });
+ fail("Should have thrown an IllegalArgumentException");
+
+ } catch (IllegalArgumentException ex) {
+ // pass...
+ }
+ assertNull(region.getEntry(key));
+ try {
+ assertEquals(value, region.get(key, arg));
+ }
+ catch(IllegalArgumentException e) {}
+ }
+ });
+
+ vm1.invoke(new SerializableRunnable("Verify loader") {
+ public void run() {
+ assertTrue(loader.wasInvoked());
+ }
+ });
+ }
+
+ /**
+ * Tests that a remote {@link CacheLoader} that throws a {@link
+ * CacheLoaderException} results is propagated back to the caller.
+ */
+ public void testRemoteCacheLoaderException() throws InterruptedException {
+ if (!supportsNetLoad()) {
+ return;
+ }
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+// final Object value = "VALUE";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Set CacheLoader") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ loader = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+ assertEquals(region, helper.getRegion());
+ assertEquals(key, helper.getKey());
+ assertNull(helper.getArgument());
+
+ String s = "Test Exception";
+ throw new CacheLoaderException(s);
+ }
+ };
+ region.getAttributesMutator().setCacheLoader(loader);
+ flushIfNecessary(region);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Remote load") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ try {
+ region.get(key);
+ fail("Should have thrown a CacheLoaderException");
+
+ } catch (CacheLoaderException ex) {
+ // pass...
+ }
+ }
+ });
+
+ vm1.invoke(new SerializableRunnable("Verify loader") {
+ public void run() {
+ assertTrue(loader.wasInvoked());
+ }
+ });
+ }
+
+
+ public void testCacheLoaderWithNetSearch() throws CacheException {
+ if (!supportsNetLoad()) {
+ return;
+ }
+ // some tests use mirroring by default (e.g. persistBackup regions)
+ // if so, then this test won't work right
+ if (getRegionAttributes().getDataPolicy().withReplication()
+ || getRegionAttributes().getDataPolicy().isPreloaded()) {
+ return;
+ }
+
+ final String name = this.getUniqueName();
+ final Object key = this.getUniqueName();
+ final Object value = new Integer(42);
+
+ Host host = Host.getHost(0);
+ // use vm on other gemfire system
+ VM vm1 = host.getVM(1);
+ vm1.invoke(new CacheSerializableRunnable("set remote value") {
+ public void run2() throws CacheException {
+// final TestCacheLoader remoteloader = new TestCacheLoader() {
+// public Object load2(LoaderHelper helper)
+// throws CacheLoaderException {
+//
+// assertEquals(key, helper.getKey());
+// assertEquals(name, helper.getRegion().getName());
+// return value;
+// }
+// };
+//
+// AttributesFactory factory =
+// new AttributesFactory(getRegionAttributes());
+// factory.setCacheLoader(remoteloader);
+ Region rgn = createRegion(name);
+ rgn.put(key, value);
+ flushIfNecessary(rgn);
+ }
+ });
+
+ final TestCacheLoader loader1 = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+
+ assertEquals(key, helper.getKey());
+ assertEquals(name, helper.getRegion().getName());
+
+ try {
+ helper.getRegion().getAttributes();
+ Object result = helper.netSearch(false);
+ assertEquals(value, result);
+ return result;
+ } catch (TimeoutException ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Why did I time out?", ex);
+ }
+ return null;
+ }
+ };
+
+ AttributesFactory f = new AttributesFactory(getRegionAttributes());
+ f.setCacheLoader(loader1);
+ Region region =
+ createRegion(name, f.create());
+
+ loader1.wasInvoked();
+
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ region.create(key, null);
+
+ entry = region.getEntry(key);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+
+ // make sure value is still there in vm1
+ vm1.invoke(new CacheSerializableRunnable("verify remote value") {
+ public void run2() throws CacheException {
+ Region rgn = getRootRegion().getSubregion(name);
+ assertEquals(value, rgn.getEntry(key).getValue());
+ }
+ });
+
+// com.gemstone.gemfire.internal.util.DebuggerSupport.waitForJavaDebugger(getLogWriter());
+ assertEquals(value, region.get(key));
+ // if global scope, then a netSearch is done BEFORE the loader is invoked,
+ // so we get the value but the loader is never invoked.
+ if (region.getAttributes().getScope().isGlobal()) {
+ assertTrue(!loader1.wasInvoked());
+ }
+ else {
+ assertTrue(loader1.wasInvoked());
+ }
+ assertEquals(value, region.getEntry(key).getValue());
+ }
+
+
+ public void testCacheLoaderWithNetLoad() throws CacheException {
+
+
+ // replicated regions and partitioned regions make no sense for this
+ // test
+ if (getRegionAttributes().getDataPolicy().withReplication() ||
+ getRegionAttributes().getDataPolicy().isPreloaded() ||
+ getRegionAttributes().getPartitionAttributes() != null)
+ {
+ return;
+ }
+
+ final String name = this.getUniqueName();
+ final Object key = this.getUniqueName();
+ final Object value = new Integer(42);
+
+ Host host = Host.getHost(0);
+ // use vm on other gemfire system
+ VM vm1 = host.getVM(1);
+ vm1.invoke(new CacheSerializableRunnable("set up remote loader") {
+ public void run2() throws CacheException {
+ final TestCacheLoader remoteloader = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+
+ assertEquals(key, helper.getKey());
+ assertEquals(name, helper.getRegion().getName());
+ return value;
+ }
+ };
+
+ AttributesFactory factory =
+ new AttributesFactory(getRegionAttributes());
+ factory.setCacheLoader(remoteloader);
+ createRegion(name, factory.create());
+ }
+ });
+
+ final TestCacheLoader loader1 = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+
+ assertEquals(key, helper.getKey());
+ assertEquals(name, helper.getRegion().getName());
+
+ try {
+ helper.getRegion().getAttributes();
+ Object result = helper.netSearch(true);
+ assertEquals(value, result);
+ return result;
+ } catch (TimeoutException ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Why did I time out?", ex);
+ }
+ return null;
+ }
+ };
+
+ AttributesFactory f = new AttributesFactory(getRegionAttributes());
+ f.setCacheLoader(loader1);
+ Region region = createRegion(name, f.create());
+
+ loader1.wasInvoked();
+
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+
+ region.create(key, null);
+
+ entry = region.getEntry(key);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+
+// com.gemstone.gemfire.internal.util.DebuggerSupport.waitForJavaDebugger(getLogWriter());
+ assertEquals(value, region.get(key));
+
+ assertTrue(loader1.wasInvoked());
+ assertEquals(value, region.getEntry(key).getValue());
+ }
+
+
+ /**
+ * Tests that {@link Region#get} returns <code>null</code> when
+ * there is no remote loader.
+ */
+ public void testNoRemoteCacheLoader() throws InterruptedException {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+// final Object value = "VALUE";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ createRegion(name);
+ }
+ };
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ vm0.invoke(new CacheSerializableRunnable("Remote load") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ assertNull(region.get(key));
+ }
+ });
+ }
+
+ /**
+ * Tests that a remote <code>CacheLoader</code> is not invoked if
+ * the remote region has an invalid entry (that is, a key, but no
+ * value).
+ */
+ public void testNoLoaderWithInvalidEntry() {
+ if (!supportsNetLoad()) {
+ return;
+ }
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object value = "VALUE";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+ loader = new TestCacheLoader() {
+ public Object load2(LoaderHelper helper)
+ throws CacheLoaderException {
+
+ return value;
+ }
+ };
+ region.getAttributesMutator().setCacheLoader(loader);
+ }
+ };
+
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Create invalid entry") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.create(key, null);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Remote get") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+// DebuggerSupport.waitForJavaDebugger(getLogWriter());
+ assertEquals(value, region.get(key));
+ assertTrue(loader.wasInvoked());
+ }
+ });
+
+ vm1.invoke(new SerializableRunnable("Verify loader") {
+ public void run() {
+ assertFalse(loader.wasInvoked());
+ }
+ });
+ }
+
+ /**
+ * Tests that a remote {@link CacheWriter} is invoked and that
+ * <code>CacheWriter</code> arguments and {@link
+ * CacheWriterException}s are propagated appropriately.
+ */
+ public void testRemoteCacheWriter() throws InterruptedException {
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object oldValue = "OLD_VALUE";
+ final Object newValue = "NEW_VALUE";
+ final Object arg = "ARG";
+ final Object exception = "EXCEPTION";
+
+ final Object key2 = "KEY2";
+ final Object value2 = "VALUE2";
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create Region") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+
+ // Put key2 in the region before any callbacks are
+ // registered, so it can be destroyed later
+ region.put(key2, value2);
+ assertEquals(1, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ LocalRegion reRegion;
+ reRegion = (LocalRegion) region;
+ RegionEntry re = reRegion.getRegionEntry(key2);
+ MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) re._getValue();
+ assertEquals(1, mc.getRefCount());
+ assertEquals(1, ma.getStats().getObjects());
+ }
+ }
+ };
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(create);
+ vm1.invoke(create);
+
+ //////// Create
+
+ vm1.invoke(new CacheSerializableRunnable("Set Writer") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ writer = new TestCacheWriter() {
+ public void beforeCreate2(EntryEvent event)
+ throws CacheWriterException {
+
+ if (exception.equals(event.getCallbackArgument())) {
+ String s = "Test Exception";
+ throw new CacheWriterException(s);
+ }
+
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isCreate());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(key, event.getKey());
+ assertEquals(null, event.getOldValue());
+ assertEquals(oldValue, event.getNewValue());
+ assertFalse(event.getOperation().isLoad());
+ assertFalse(event.getOperation().isLocalLoad());
+ assertFalse(event.getOperation().isNetLoad());
+ assertFalse(event.getOperation().isNetSearch());
+
+ }
+ };
+ region.getAttributesMutator().setCacheWriter(writer);
+ flushIfNecessary(region);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Create with Exception") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ try {
+ region.put(key, oldValue, exception);
+ fail("Should have thrown a CacheWriterException");
+
+ } catch (CacheWriterException ex) {
+ assertNull(region.getEntry(key));
+ assertEquals(1, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(1, ma.getStats().getObjects());
+ }
+ }
+ }
+ });
+
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Create with Argument") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, oldValue, arg);
+ assertEquals(2, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(2, ma.getStats().getObjects());
+ LocalRegion reRegion;
+ reRegion = (LocalRegion) region;
+ MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
+ assertEquals(1, mc.getRefCount());
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ //////// Update
+
+ vm1.invoke(new CacheSerializableRunnable("Set Writer") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ writer = new TestCacheWriter() {
+ public void beforeUpdate2(EntryEvent event)
+ throws CacheWriterException {
+
+ Object argument = event.getCallbackArgument();
+ if (exception.equals(argument)) {
+ String s = "Test Exception";
+ throw new CacheWriterException(s);
+ }
+
+ assertEquals(arg, argument);
+
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isUpdate());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(key, event.getKey());
+ assertEquals(oldValue, event.getOldValue());
+ assertEquals(newValue, event.getNewValue());
+ assertFalse(event.getOperation().isLoad());
+ assertFalse(event.getOperation().isLocalLoad());
+ assertFalse(event.getOperation().isNetLoad());
+ assertFalse(event.getOperation().isNetSearch());
+
+ }
+ };
+ region.getAttributesMutator().setCacheWriter(writer);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Update with Exception") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ try {
+ region.put(key, newValue, exception);
+ fail("Should have thrown a CacheWriterException");
+
+ } catch (CacheWriterException ex) {
+ Region.Entry entry = region.getEntry(key);
+ assertEquals(oldValue, entry.getValue());
+ assertEquals(2, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(2, ma.getStats().getObjects());
+ LocalRegion reRegion;
+ reRegion = (LocalRegion) region;
+ MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
+ assertEquals(1, mc.getRefCount());
+ }
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Update with Argument") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.put(key, newValue, arg);
+ assertEquals(2, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(2, ma.getStats().getObjects());
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ //////// Destroy
+
+ vm1.invoke(new CacheSerializableRunnable("Set Writer") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ writer = new TestCacheWriter() {
+ public void beforeDestroy2(EntryEvent event)
+ throws CacheWriterException {
+
+ Object argument = event.getCallbackArgument();
+ if (exception.equals(argument)) {
+ String s = "Test Exception";
+ throw new CacheWriterException(s);
+ }
+
+ assertEquals(arg, argument);
+
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isDestroy());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ assertEquals(key, event.getKey());
+ assertEquals(newValue, event.getOldValue());
+ assertNull(event.getNewValue());
+ assertFalse(event.getOperation().isLoad());
+ assertFalse(event.getOperation().isLocalLoad());
+ assertFalse(event.getOperation().isNetLoad());
+ assertFalse(event.getOperation().isNetSearch());
+ }
+ };
+ region.getAttributesMutator().setCacheWriter(writer);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy with Exception") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ try {
+ region.destroy(key, exception);
+ fail("Should have thrown a CacheWriterException");
+
+ } catch (CacheWriterException ex) {
+ assertNotNull(region.getEntry(key));
+ assertEquals(2, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(2, ma.getStats().getObjects());
+ }
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy with Argument") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ region.destroy(key, arg);
+ assertEquals(1, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(1, ma.getStats().getObjects());
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ //////// Region Destroy
+
+ vm1.invoke(new CacheSerializableRunnable("Set Writer") {
+ public void run2() throws CacheException {
+ final Region region =
+ getRootRegion().getSubregion(name);
+ writer = new TestCacheWriter() {
+ public void beforeRegionDestroy2(RegionEvent event)
+ throws CacheWriterException {
+
+ Object argument = event.getCallbackArgument();
+ if (exception.equals(argument)) {
+ String s = "Test Exception";
+ throw new CacheWriterException(s);
+ }
+
+ assertEquals(arg, argument);
+
+ assertEquals(region, event.getRegion());
+ assertTrue(event.getOperation().isRegionDestroy());
+ assertTrue(event.getOperation().isDistributed());
+ assertFalse(event.getOperation().isExpiration());
+ assertTrue(event.isOriginRemote());
+ }
+ };
+ region.getAttributesMutator().setCacheWriter(writer);
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy with Exception") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ try {
+ region.destroyRegion(exception);
+ fail("Should have thrown a CacheWriterException");
+
+ } catch (CacheWriterException ex) {
+ if (region.isDestroyed()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("should not have an exception if region is destroyed", ex);
+ }
+ assertEquals(1, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(1, ma.getStats().getObjects());
+ }
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Destroy with Argument") {
+ public void run2() throws CacheException {
+ Region region =
+ getRootRegion().getSubregion(name);
+ assertEquals(1, region.size());
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ assertEquals(1, ma.getStats().getObjects());
+ }
+ region.destroyRegion(arg);
+ if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
+ GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
+ final SimpleMemoryAllocatorImpl ma = (SimpleMemoryAllocatorImpl) gfc.getOffHeapStore();
+ WaitCriterion waitForStatChange = new WaitCriterion() {
+ public boolean done() {
+ return ma.getStats().getObjects() == 0;
+ }
+ public String description() {
+ return "never saw off-heap object count go to zero. Last value was " + ma.getStats().getObjects();
+ }
+ };
+ Wait.waitForCriterion(waitForStatChange, 3000, 10, true);
+ }
+ }
+ });
+ vm1.invoke(new SerializableRunnable("Verify callback") {
+ public void run() {
+ assertTrue(writer.wasInvoked());
+ }
+ });
+ }
+
+ /**
+ * Tests that, when given a choice, a local <code>CacheWriter</code>
+ * is invoked instead of a remote one.
+ */
+ public void testLocalAndRemoteCacheWriters()
+ throws InterruptedException {
+
+ assertTrue(getRegionAttributes().getScope().isDistributed());
+
+ final String name = this.getUniqueName();
+ final Object key = "KEY";
+ final Object oldValue = "OLD_VALUE";
+ final Object newValue = "NEW_VALUE";
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ vm0.invoke(new CacheSerializableRunnable("Create \"Local\" Region") {
+ public void run2() throws CacheException {
+ Region region = createRegion(name);
+ writer = new TestCacheWriter() {
+ public void beforeUpdate2(EntryEvent event)
+ throws CacheWriterException { }
+
+ p
<TRUNCATED>