You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/04/23 05:46:46 UTC

[geode] 08/13: GEODE-1279: Rename BucketCreationCrash RegressionTests

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0ac55878e52a3abe83617550950debb2b2485742
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Apr 12 12:59:21 2018 -0700

    GEODE-1279: Rename BucketCreationCrash RegressionTests
    
    * BucketCreationRequesterCrashHARegressionTest ->
      BucketCreationCrashNoHangRegressionTest
    * Bug39356DUnitTest -> BucketCreationCrashCompletesRegressionTest
---
 ...java => BucketCreationCrashRegressionTest.java} |  46 ++--
 ...BucketCreationCrashCompletesRegressionTest.java | 248 +++++++++++++++++++++
 .../cache/partitioned/Bug39356DUnitTest.java       | 233 -------------------
 3 files changed, 273 insertions(+), 254 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationRequesterCrashHARegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
similarity index 88%
rename from geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationRequesterCrashHARegressionTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
index c0e124f..41c5617 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationRequesterCrashHARegressionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketCreationCrashRegressionTest.java
@@ -19,14 +19,16 @@ import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWOR
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
 import static org.apache.geode.test.dunit.DistributedTestUtils.crashDistributedSystem;
-import static org.apache.geode.test.dunit.Host.getHost;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.util.Properties;
 import java.util.Set;
@@ -50,26 +52,25 @@ import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.cache.partitioned.ManageBucketMessage;
 import org.apache.geode.internal.cache.partitioned.ManageBucketMessage.ManageBucketReplyMessage;
-import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.cache.CacheTestCase;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.dunit.rules.SharedErrorCollector;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
 import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
 
 /**
- * Test to make sure that we can handle a crash of the member directing bucket creation.
- *
- * BucketCreationRequesterCrashHARegressionTest
+ * Verifies that new bucket does not hang after requester crashes.
  *
  * <p>
  * TRAC #41733: Hang in BucketAdvisor.waitForPrimaryMember
  */
 @Category(DistributedTest.class)
-public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase {
+@SuppressWarnings("serial")
+public class BucketCreationCrashRegressionTest implements Serializable {
 
   private String uniqueName;
   private String hostName;
@@ -81,6 +82,12 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
   private VM locator;
 
   @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
   public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
 
   @Rule
@@ -91,12 +98,12 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
 
   @Before
   public void setUp() throws Exception {
-    server1 = getHost(0).getVM(0);
-    server2 = getHost(0).getVM(1);
-    locator = getHost(0).getVM(2);
+    server1 = getVM(0);
+    server2 = getVM(1);
+    locator = getVM(2);
 
     uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
-    hostName = NetworkUtils.getServerHostName(server1.getHost());
+    hostName = getHostName();
     locatorLog = new File(temporaryFolder.newFolder(uniqueName), "locator.log");
 
     locatorPort = locator.invoke(() -> startLocator());
@@ -106,9 +113,8 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
     server2.invoke(() -> createServerCache());
 
     // cluster should ONLY have 3 members (our 2 servers and 1 locator)
-    assertThat(server1.invoke(
-        () -> getCache().getDistributionManager().getDistributionManagerIdsIncludingAdmin()))
-            .hasSize(3);
+    assertThat(server1.invoke(() -> cacheRule.getCache().getDistributionManager()
+        .getDistributionManagerIdsIncludingAdmin())).hasSize(3);
 
     addIgnoredException(ForcedDisconnectException.class);
   }
@@ -119,8 +125,6 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
     invokeInEveryVM(() -> {
       DistributionMessageObserver.setInstance(null);
     });
-
-    disconnectAllFromDS();
   }
 
   /**
@@ -199,7 +203,7 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
   }
 
   private void createServerCache() {
-    getCache(createServerConfig());
+    cacheRule.createCache(createServerConfig());
   }
 
   private void createPartitionedRegion() {
@@ -210,11 +214,11 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
     af.setDataPolicy(DataPolicy.PARTITION);
     af.setPartitionAttributes(paf.create());
 
-    getCache().createRegion(uniqueName, af.create());
+    cacheRule.getCache().createRegion(uniqueName, af.create());
   }
 
   private void putData(final int startKey, final int endKey, final String value) {
-    Region<Integer, String> region = getCache().getRegion(uniqueName);
+    Region<Integer, String> region = cacheRule.getCache().getRegion(uniqueName);
 
     for (int i = startKey; i < endKey; i++) {
       region.put(i, value);
@@ -222,7 +226,7 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
   }
 
   private Set<Integer> getBucketList() {
-    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(uniqueName);
+    PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(uniqueName);
     return new TreeSet<>(region.getDataStore().getAllLocalBucketIds());
   }
 
@@ -233,7 +237,7 @@ public class BucketCreationRequesterCrashHARegressionTest extends CacheTestCase
   }
 
   private void crashServer() {
-    crashDistributedSystem(getSystem());
+    crashDistributedSystem(cacheRule.getSystem());
   }
 
   private class RunnableBeforeProcessMessageObserver extends DistributionMessageObserver {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/BucketCreationCrashCompletesRegressionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/BucketCreationCrashCompletesRegressionTest.java
new file mode 100644
index 0000000..8415408
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/BucketCreationCrashCompletesRegressionTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
+import static org.apache.geode.test.dunit.DistributedTestUtils.crashDistributedSystem;
+import static org.apache.geode.test.dunit.DistributedTestUtils.getAllDistributedSystemProperties;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * Verifies that bucket creation completes even after requester crashes.
+ *
+ * <p>
+ * TRAC #39356: Missing PR buckets with HA
+ */
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class BucketCreationCrashCompletesRegressionTest implements Serializable {
+
+  private String regionName;
+
+  private VM vm0;
+  private VM vm1;
+  private VM vm2;
+
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setUp() {
+    vm0 = getVM(0);
+    vm1 = getVM(1);
+    vm2 = getVM(2);
+
+    regionName = getClass().getSimpleName() + "_" + testName.getMethodName();
+
+    vm0.invoke(() -> createCache(getDistributedSystemProperties()));
+    vm1.invoke(() -> createCache(getDistributedSystemProperties()));
+    vm2.invoke(() -> createCache(getDistributedSystemProperties()));
+  }
+
+  /**
+   * This tests the case where the VM forcing other VMs to create a bucket crashes while creating
+   * the bucket.
+   */
+  @Test
+  public void testCrashWhileCreatingABucket() {
+    vm1.invoke(() -> createPartitionedRegionWithObserver());
+    vm2.invoke(() -> createPartitionedRegionWithObserver());
+
+    vm0.invoke(() -> createAccessorAndCrash());
+
+    vm1.invoke(() -> verifyBucketsAfterAccessorCrashes());
+    vm2.invoke(() -> verifyBucketsAfterAccessorCrashes());
+  }
+
+  /**
+   * A test to make sure that we cannot move a bucket to a member which already hosts the bucket,
+   * thereby reducing our redundancy.
+   */
+  @Test
+  public void testMoveBucketToHostThatHasTheBucketAlready() {
+    vm0.invoke(() -> createPartitionedRegion());
+    vm1.invoke(() -> createPartitionedRegion());
+
+    // Create a bucket
+    vm0.invoke(() -> {
+      createBucket();
+    });
+
+    InternalDistributedMember member1 = vm1.invoke(() -> getCache().getMyId());
+
+    // Move the bucket
+    vm0.invoke(() -> {
+      verifyCannotMoveBucketToExistingHost(member1);
+    });
+  }
+
+  private void createPartitionedRegionWithObserver() {
+    DistributionMessageObserver.setInstance(new MyRegionObserver());
+
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(1);
+    paf.setRecoveryDelay(0);
+
+    AttributesFactory af = new AttributesFactory();
+    af.setDataPolicy(DataPolicy.PARTITION);
+    af.setPartitionAttributes(paf.create());
+
+    getCache().createRegion(regionName, af.create());
+  }
+
+  private void createAccessorAndCrash() {
+    PartitionAttributesFactory<String, String> paf = new PartitionAttributesFactory<>();
+    paf.setRedundantCopies(1);
+    paf.setLocalMaxMemory(0);
+
+    AttributesFactory<String, String> af = new AttributesFactory<>();
+    af.setDataPolicy(DataPolicy.PARTITION);
+    af.setPartitionAttributes(paf.create());
+
+    Region<String, String> region = getCache().createRegion(regionName, af.create());
+
+    // trigger the creation of a bucket, which should trigger the destruction of this VM.
+    assertThatThrownBy(() -> region.put("ping", "pong")).isInstanceOf(CancelException.class);
+  }
+
+  private boolean hasBucketOwners(PartitionedRegion partitionedRegion, int bucketId) {
+    try {
+      return partitionedRegion.getBucketOwnersForValidation(bucketId) != null;
+    } catch (ForceReattemptException e) {
+      return false;
+    }
+  }
+
+  private void verifyBucketsAfterAccessorCrashes() throws ForceReattemptException {
+    PartitionedRegion partitionedRegion = (PartitionedRegion) getCache().getRegion(regionName);
+    for (int i = 0; i < partitionedRegion.getAttributes().getPartitionAttributes()
+        .getTotalNumBuckets(); i++) {
+      int bucketId = i;
+
+      await().atMost(2, MINUTES).until(() -> {
+        hasBucketOwners(partitionedRegion, bucketId);
+      });
+
+      List owners = partitionedRegion.getBucketOwnersForValidation(bucketId);
+      assertThat(owners).isNotNull();
+      if (owners.isEmpty()) {
+        continue;
+      }
+      assertThat(owners).hasSize(2);
+    }
+  }
+
+  private void createPartitionedRegion() {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setRedundantCopies(1);
+    paf.setRecoveryDelay(-1);
+    paf.setStartupRecoveryDelay(-1);
+
+    AttributesFactory af = new AttributesFactory();
+    af.setPartitionAttributes(paf.create());
+
+    getCache().createRegion(regionName, af.create());
+  }
+
+  private void createBucket() {
+    Region<Integer, String> region = getCache().getRegion(regionName);
+    region.put(0, "A");
+  }
+
+  private void verifyCannotMoveBucketToExistingHost(InternalDistributedMember member1) {
+    PartitionedRegion partitionedRegion = (PartitionedRegion) getCache().getRegion(regionName);
+    Set<InternalDistributedMember> bucketOwners =
+        partitionedRegion.getRegionAdvisor().getBucketOwners(0);
+
+    assertThat(bucketOwners).hasSize(2);
+
+    PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
+
+    assertThat(dataStore.isManagingBucket(0)).isTrue();
+    // try to move the bucket from the other member to this one. This should
+    // fail because we already have the bucket
+    assertThat(dataStore.moveBucket(0, member1, true)).isFalse();
+    assertThat(partitionedRegion.getRegionAdvisor().getBucketOwners(0)).isEqualTo(bucketOwners);
+  }
+
+  private void crashServer() {
+    crashDistributedSystem(cacheRule.getSystem());
+  }
+
+  private InternalCache getCache() {
+    return cacheRule.getCache();
+  }
+
+  private void createCache(Properties config) {
+    cacheRule.createCache(config);
+  }
+
+  public Properties getDistributedSystemProperties() {
+    Properties config = new Properties();
+    config.put(ENABLE_NETWORK_PARTITION_DETECTION, "false");
+    return getAllDistributedSystemProperties(config);
+  }
+
+  private class MyRegionObserver extends DistributionMessageObserver implements Serializable {
+
+    @Override
+    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
+      if (message instanceof ManageBucketMessage) {
+        vm0.invoke(() -> {
+          crashServer();
+        });
+      }
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/Bug39356DUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/Bug39356DUnitTest.java
deleted file mode 100644
index d6b9ee8..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/Bug39356DUnitTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.partitioned;
-
-import static org.junit.Assert.*;
-
-import java.io.Serializable;
-import java.util.*;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.LogWriter;
-import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.PartitionAttributes;
-import org.apache.geode.cache.PartitionAttributesFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.distributed.*;
-import org.apache.geode.distributed.internal.ClusterDistributionManager;
-import org.apache.geode.distributed.internal.DistributionMessage;
-import org.apache.geode.distributed.internal.DistributionMessageObserver;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.ForceReattemptException;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionDataStore;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.SerializableCallable;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-@Category(DistributedTest.class)
-public class Bug39356DUnitTest extends JUnit4CacheTestCase {
-
-  protected static final String REGION_NAME = "myregion";
-
-  @Override
-  public Properties getDistributedSystemProperties() {
-    Properties result = super.getDistributedSystemProperties();
-    result.put(ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION, "false");
-    return result;
-  }
-
-  /**
-   * This tests the case where the VM forcing other VMs to create a bucket crashes while creating
-   * the bucket.
-   */
-  @Test
-  public void testCrashWhileCreatingABucket() {
-    Host host = Host.getHost(0);
-    final VM vm0 = host.getVM(0);
-    final VM vm1 = host.getVM(1);
-    final VM vm2 = host.getVM(2);
-
-    SerializableRunnable createParReg = new SerializableRunnable("Create parReg") {
-      public void run() {
-        DistributionMessageObserver.setInstance(new MyRegionObserver(vm0));
-        Cache cache = getCache();
-        AttributesFactory af = new AttributesFactory();
-        PartitionAttributesFactory pf = new PartitionAttributesFactory();
-        pf.setRedundantCopies(1);
-        pf.setRecoveryDelay(0);
-        af.setDataPolicy(DataPolicy.PARTITION);
-        af.setPartitionAttributes(pf.create());
-        cache.createRegion(REGION_NAME, af.create());
-      }
-    };
-    vm1.invoke(createParReg);
-    vm2.invoke(createParReg);
-
-    SerializableRunnable createParRegAccessor = new SerializableRunnable("Create parReg") {
-      public void run() {
-        Cache cache = getCache();
-        AttributesFactory af = new AttributesFactory();
-        PartitionAttributesFactory pf = new PartitionAttributesFactory();
-        pf.setRedundantCopies(1);
-        pf.setLocalMaxMemory(0);
-        af.setDataPolicy(DataPolicy.PARTITION);
-        af.setPartitionAttributes(pf.create());
-        Region r = cache.createRegion(REGION_NAME, af.create());
-
-        // trigger the creation of a bucket, which should trigger the destruction of this VM.
-        try {
-          r.put("ping", "pong");
-          fail("Should have gotten a CancelException");
-        } catch (CancelException e) {
-          // this is ok, we expect our observer to close this cache.
-        }
-      }
-    };
-
-    vm0.invoke(createParRegAccessor);
-
-    SerializableRunnable verifyBuckets = new SerializableRunnable("Verify buckets") {
-
-      public void run() {
-        LogWriter log = org.apache.geode.test.dunit.LogWriterUtils.getLogWriter();
-        Cache cache = getCache();
-        PartitionedRegion r = (PartitionedRegion) cache.getRegion(REGION_NAME);
-        for (int i = 0; i < r.getAttributes().getPartitionAttributes().getTotalNumBuckets(); i++) {
-          List owners = null;
-          while (owners == null) {
-            try {
-              owners = r.getBucketOwnersForValidation(i);
-            } catch (ForceReattemptException e) {
-              log.info(
-                  Bug39356DUnitTest.class + " verify buckets Caught a ForceReattemptException");
-              Wait.pause(1000);
-            }
-          }
-          if (owners.isEmpty()) {
-            log.info("skipping bucket " + i + " because it has no data");
-            continue;
-          }
-          assertEquals("Expecting bucket " + i + " to have two copies", 2, owners.size());
-          log.info("bucket " + i + " had two copies");
-        }
-      }
-    };
-    vm1.invoke(verifyBuckets);
-    vm2.invoke(verifyBuckets);
-  }
-
-  protected class MyRegionObserver extends DistributionMessageObserver implements Serializable {
-    private final VM vm0;
-
-    MyRegionObserver(VM vm0) {
-      this.vm0 = vm0;
-    }
-
-
-    public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {}
-
-
-    public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
-      if (message instanceof ManageBucketMessage) {
-        vm0.invoke(new SerializableRunnable("Disconnect VM 0") {
-          public void run() {
-            disconnectFromDS();
-            try {
-              Thread.sleep(10000);
-            } catch (InterruptedException e) {
-              fail("interrupted");
-            }
-          }
-        });
-      }
-    }
-
-  }
-
-  /**
-   * A test to make sure that we cannot move a bucket to a member which already hosts the bucket,
-   * thereby reducing our redundancy.
-   */
-  @Test
-  public void testMoveBucketToHostThatHasTheBucketAlready() {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-
-    SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
-      public void run() {
-        Cache cache = getCache();
-        AttributesFactory attr = new AttributesFactory();
-        PartitionAttributesFactory paf = new PartitionAttributesFactory();
-        paf.setRedundantCopies(1);
-        paf.setRecoveryDelay(-1);
-        paf.setStartupRecoveryDelay(-1);
-        PartitionAttributes prAttr = paf.create();
-        attr.setPartitionAttributes(prAttr);
-        cache.createRegion("region1", attr.create());
-      }
-    };
-
-    vm0.invoke(createPrRegion);
-    vm1.invoke(createPrRegion);
-
-    // Create a bucket
-    vm0.invoke(new SerializableRunnable("createSomeBuckets") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion("region1");
-        region.put(Integer.valueOf(0), "A");
-      }
-    });
-
-    final InternalDistributedMember vm1MemberId =
-        (InternalDistributedMember) vm1.invoke(new SerializableCallable() {
-
-          public Object call() throws Exception {
-            return InternalDistributedSystem.getAnyInstance().getDistributedMember();
-          }
-        });
-
-
-    // Move the bucket
-    vm0.invoke(new SerializableRunnable("moveBucket") {
-
-      public void run() {
-        Cache cache = getCache();
-        PartitionedRegion region = (PartitionedRegion) cache.getRegion("region1");
-        Set<InternalDistributedMember> owners = region.getRegionAdvisor().getBucketOwners(0);
-        assertEquals(2, owners.size());
-        PartitionedRegionDataStore ds = region.getDataStore();
-        assertTrue(ds.isManagingBucket(0));
-        // try to move the bucket from the other member to this one. This should
-        // fail because we already have the bucket
-        assertFalse(ds.moveBucket(0, vm1MemberId, true));
-        assertEquals(owners, region.getRegionAdvisor().getBucketOwners(0));
-      }
-    });
-  }
-}

-- 
To stop receiving notification emails like this one, please contact
klund@apache.org.