You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2022/01/06 20:21:32 UTC

[GitHub] [geode] DonalEvans commented on a change in pull request #7124: GEODE-9815: Prefer to remove a redundant copy in the same zone

DonalEvans commented on a change in pull request #7124:
URL: https://github.com/apache/geode/pull/7124#discussion_r779775923



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
##########
@@ -468,6 +507,41 @@ public Move findBestTarget(Bucket bucket, boolean checkIPAddress) {
     return bestMove;
   }
 
+  String getRedundancyZone(InternalDistributedMember memberID) {
+    assert (partitionedRegion != null);

Review comment:
       Do we want to be using `assert` here? Asserts aren't enabled by default, so in most cases this line will be skipped and not do anything, and even if assertions are enabled, all this line is doing is replacing a `NullPointerException` on the following line with an `AssertionError`.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };

Review comment:
       Several warnings in this class can be fixed by replacing this with:
   ```
       SERVER_ZONE_MAP = new HashMap<>();
       SERVER_ZONE_MAP.put(1, ZONE_A);
       SERVER_ZONE_MAP.put(2, ZONE_A);
       SERVER_ZONE_MAP.put(3, ZONE_B);
       SERVER_ZONE_MAP.put(4, ZONE_B);
   ```

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
##########
@@ -902,19 +927,555 @@ public void testMoveBuckets() throws UnknownHostException {
         buildDetails(member2, 500, 500, new long[] {0, 0, 0, 0}, new long[] {0, 0, 0, 0});
     model.addRegion("a", Arrays.asList(details1, details2), new FakeOfflineDetails(), true);
 
-    assertEquals(2, doMoves(new CompositeDirector(false, false, true, true), model));
+    assertThat(doMoves(new CompositeDirector(false, false, true, true), model)).isEqualTo(2);
 
-    assertEquals(Collections.emptyList(), bucketOperator.creates);
-    assertEquals(Collections.emptyList(), bucketOperator.primaryMoves);
+    assertThat(bucketOperator.creates).isEqualTo(Collections.emptyList());
+    assertThat(bucketOperator.primaryMoves).isEqualTo(Collections.emptyList());
 
     // Two of the buckets should move to member2
     List<Move> expectedMoves = new ArrayList<>();
     expectedMoves.add(new Move(member1, member2));
     expectedMoves.add(new Move(member1, member2));
 
-    assertEquals(expectedMoves, bucketOperator.bucketMoves);
+    assertThat(bucketOperator.bucketMoves).isEqualTo(expectedMoves);
+  }
+
+  @Test
+  public void testRemoveBuckets() throws UnknownHostException {

Review comment:
       The test names in this class added in this PR are quite confusing to me. Would it be possible to rename the tests in such a way that it makes it clear what is being tested, under what conditions, and what the expected outcome is? My personal preference is always for clarity in test names even if they end up being very long as a result.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };
+
+    cleanOutServerDirectories();
+
+    // Startup the servers
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      startServerInRedundancyZone(entry.getKey(), entry.getValue());
+    }
+
+    // Put data in the server regions
+    clientPopulateServers();
+
+    // Rebalance Server VM will initiate the rebalances in this test
+    VM server2 = clusterStartupRule.getVM(2);
+
+    // Take the server 1 offline
+    clusterStartupRule.stop(1, false);
+
+    // Baseline rebalance with everything up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Stop server 3 never to start again.
+    clusterStartupRule.stop(3, true);
+
+    // Restart the server 1
+    startServerInRedundancyZone(1, SERVER_ZONE_MAP.get(1));
+
+    // Rebalance with remaining servers up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // print the bucket count on server 4 for debug. Should be 113 because server 3 is still down
+    assertThat(getBucketCount(4)).isEqualTo(113);
+
+    assertThat(getBucketCount(1)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    assertThat(getBucketCount(2)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    // Verify that all bucket counts add up to what they should
+    int zoneABucketCount = getZoneBucketCount(REGION_NAME, ZONE_A);
+    assertThat(zoneABucketCount).isEqualTo(EXPECTED_BUCKET_COUNT);
+  }
+
+  private int getBucketCount(int server) {
+    return clusterStartupRule.getVM(server).invoke(() -> {
+      PartitionedRegion region =
+          (PartitionedRegion) ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      return region.getLocalBucketsListTestOnly().size();
+    });
+  }
+
+  protected void cleanOutServerDirectory(int server) {
+    VM.getVM(server).invoke(() -> {
+      String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + server;
+      File temporaryDirectory = new File(path);
+      if (temporaryDirectory.exists()) {
+        try {
+          Arrays.stream(temporaryDirectory.listFiles()).forEach(FileUtils::deleteQuietly);
+          Files.delete(temporaryDirectory.toPath());
+        } catch (Exception exception) {
+          logger.error("The delete of files or directory failed ", exception);
+          throw exception;
+        }
+      }
+    });
+  }
+
+  protected void cleanOutServerDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      cleanOutServerDirectory(entry.getKey());
+    }
+  }
+
+  /**
+   * Startup a client to put all the data in the server regions
+   */
+  protected void clientPopulateServers() throws Exception {
+    Properties properties2 = new Properties();
+    ClientVM clientVM =
+        clusterStartupRule.startClientVM(SERVER_ZONE_MAP.size() + 1, properties2,
+            ccf -> ccf.addPoolLocator("localhost", locatorPort));
+
+    clientVM.invoke(() -> {
+
+      Map<Integer, String> putMap = new HashMap<>();
+      for (int i = 0; i < 1000; i++) {
+        putMap.put(i, "A");
+      }
+
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          clientCache.createClientRegionFactory(
+              ClientRegionShortcut.PROXY);
+      clientRegionFactory.create(REGION_NAME);
+      Region<Integer, String> region = clientCache.getRegion(REGION_NAME);
+      region.putAll(putMap);
+    });
+  }
+
+  /**
+   * Startup server *index* in *redundancy zone*
+   *
+   * @param index - server
+   * @param zone - Redundancy zone for the server to be started in
+   */
+  protected void startServerInRedundancyZone(int index, final String zone) {
+
+    clusterStartupRule.startServerVM(index, s -> s
+        .withProperty(REDUNDANCY_ZONE, zone)
+        .withConnectionToLocator(locatorPort));
+
+    VM.getVM(index).invoke(() -> {
+      RegionFactory<Object, Object> regionFactory =
+          ClusterStartupRule.getCache().createRegionFactory(
+              RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+      PartitionAttributesImpl partitionAttributesImpl = new PartitionAttributesImpl();
+      partitionAttributesImpl.setRedundantCopies(1);
+      partitionAttributesImpl.setStartupRecoveryDelay(-1);
+      partitionAttributesImpl.setRecoveryDelay(-1);
+      partitionAttributesImpl.setTotalNumBuckets(113);

Review comment:
       Should this hard-coded number be `EXPECTED_BUCKET_COUNT`?

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };
+
+    cleanOutServerDirectories();

Review comment:
       Is this method call necessary? `ClusterStartupRule` should not be starting servers in already-existing directories with anything in them.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
##########
@@ -425,19 +425,58 @@ public void remoteOverRedundancyBucket(BucketRollup bucket, Member targetMember)
   }
 
   private void initLowRedundancyBuckets() {
-    this.lowRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
     for (BucketRollup b : this.buckets) {
       if (b != null && b.getRedundancy() >= 0 && b.getRedundancy() < this.requiredRedundancy) {
         this.lowRedundancyBuckets.add(b);
       }
     }
   }
 
+
+  /**
+   * Original functionality if bucket's redundancy is greater than what is necessary add it to the
+   * over redundancy bucket list, so it can be cleared
+   * <p>
+   * Newly added functionality is to make this so that we don't have a bucket in the same redundancy
+   * zone twice if zones are in use.
+   */
   private void initOverRedundancyBuckets() {

Review comment:
       With the changes here, this method is now doing significantly more than just initializing the `overRedundancyBuckets` set. It might be better to rename it to more accurately reflect what's being done in the method.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();

Review comment:
       This seems like leftover debugging code. Is it necessary?

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }

Review comment:
       This method seems like it could just be inlined into the `@After` method, but also, these steps should be performed automatically by the `ClusterStartupRule`, so this shouldn't be necessary at all.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };
+
+    cleanOutServerDirectories();
+
+    // Startup the servers
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      startServerInRedundancyZone(entry.getKey(), entry.getValue());
+    }
+
+    // Put data in the server regions
+    clientPopulateServers();
+
+    // Rebalance Server VM will initiate the rebalances in this test
+    VM server2 = clusterStartupRule.getVM(2);
+
+    // Take the server 1 offline
+    clusterStartupRule.stop(1, false);
+
+    // Baseline rebalance with everything up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Stop server 3 never to start again.
+    clusterStartupRule.stop(3, true);
+
+    // Restart the server 1
+    startServerInRedundancyZone(1, SERVER_ZONE_MAP.get(1));
+
+    // Rebalance with remaining servers up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // print the bucket count on server 4 for debug. Should be 113 because server 3 is still down
+    assertThat(getBucketCount(4)).isEqualTo(113);

Review comment:
       This hard coded number should probably be `EXPECTED_BUCKET_COUNT`.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
##########
@@ -425,19 +425,58 @@ public void remoteOverRedundancyBucket(BucketRollup bucket, Member targetMember)
   }
 
   private void initLowRedundancyBuckets() {
-    this.lowRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
     for (BucketRollup b : this.buckets) {
       if (b != null && b.getRedundancy() >= 0 && b.getRedundancy() < this.requiredRedundancy) {
         this.lowRedundancyBuckets.add(b);
       }
     }
   }
 
+
+  /**
+   * Original functionality if bucket's redundancy is greater than what is necessary add it to the
+   * over redundancy bucket list, so it can be cleared
+   * <p>
+   * Newly added functionality is to make this so that we don't have a bucket in the same redundancy
+   * zone twice if zones are in use.
+   */
   private void initOverRedundancyBuckets() {
     this.overRedundancyBuckets = new TreeSet<>(REDUNDANCY_COMPARATOR);
+    Set<String> redundancyZonesFound = new HashSet<>();
+
+    // For every bucket
     for (BucketRollup b : this.buckets) {
-      if (b != null && b.getOnlineRedundancy() > this.requiredRedundancy) {
-        this.overRedundancyBuckets.add(b);
+      if (b != null) {
+        // check to see if the existing redundancy is greater than required
+        if (b.getOnlineRedundancy() > this.requiredRedundancy) {
+          // if so, add the bucket to the over redundancy list
+          this.overRedundancyBuckets.add(b);
+        } else {
+          // otherwise, for each member that is hosting the bucket
+          for (Member member : b.getMembersHosting()) {
+
+            // get the redundancy zone of the member
+            String redundancyZone = this.getRedundancyZone(member.getDistributedMember());
+            if (redundancyZone != null) {
+              // if the redundancy zone is not already in the list
+              if (redundancyZonesFound.contains(redundancyZone)) {
+                // add the bucket to the over redundancy list because we have more than one member
+                // with this bucket in the same zone. something we don't prefer with multiple zones
+                this.overRedundancyBuckets.add(b);
+                if (b.getOnlineRedundancy() - 1 < b.getRedundancy()) {
+                  this.lowRedundancyBuckets.add(b);
+                }

Review comment:
       This feels potentially confusing, in that a bucket could be in both the `lowRedundancyBuckets` and `overRedundancyBuckets` sets after this method is called. A developer would be forgiven for assuming that a bucket can be in only one of these sets as their names imply that they're mutually exclusive.
   
   Also, currently the only place the `initOverRedundancyBuckets()` method is called, it's immediately followed by a call to the `initLowRedundancyBuckets()` method, which uses a different check to determine if a bucket should be added to `lowRedundancyBuckets`. Would it make sense to remove this check so that work isn't duplicated in the `initLowRedundancyBuckets()` method?

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java
##########
@@ -152,6 +155,73 @@ public void testEnforceZoneWithSixServersAndTwoZones(int rebalanceServer,
     compareZoneBucketCounts(REGION_NAME);
   }
 
+
+  /**
+   * This test tests the case that we don't accidentally leave extra copies in a
+   * redundancy zone after a server recovers.
+   *
+   */
+  @Test
+  public void testThreeZonesHaveTwoCopiesAfterRebalance() throws Exception {
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_B);
+        put(3, ZONE_C);
+      }
+    };
+
+    cleanOutServerDirectories();

Review comment:
       As with the test in `RebalanceOperationComplexPart2DistributedTest`, if we don't need to be using persistence in this test, making the region simply `PARTITION_REDUNDANT` should allow things to be simplified a fair bit, with no need for cache-xml or manually clearing the server directories.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
##########
@@ -902,19 +927,555 @@ public void testMoveBuckets() throws UnknownHostException {
         buildDetails(member2, 500, 500, new long[] {0, 0, 0, 0}, new long[] {0, 0, 0, 0});
     model.addRegion("a", Arrays.asList(details1, details2), new FakeOfflineDetails(), true);
 
-    assertEquals(2, doMoves(new CompositeDirector(false, false, true, true), model));
+    assertThat(doMoves(new CompositeDirector(false, false, true, true), model)).isEqualTo(2);
 
-    assertEquals(Collections.emptyList(), bucketOperator.creates);
-    assertEquals(Collections.emptyList(), bucketOperator.primaryMoves);
+    assertThat(bucketOperator.creates).isEqualTo(Collections.emptyList());
+    assertThat(bucketOperator.primaryMoves).isEqualTo(Collections.emptyList());
 
     // Two of the buckets should move to member2
     List<Move> expectedMoves = new ArrayList<>();
     expectedMoves.add(new Move(member1, member2));
     expectedMoves.add(new Move(member1, member2));
 
-    assertEquals(expectedMoves, bucketOperator.bucketMoves);
+    assertThat(bucketOperator.bucketMoves).isEqualTo(expectedMoves);
+  }
+
+  @Test
+  public void testRemoveBuckets() throws UnknownHostException {
+
+    InternalDistributedMember member1 =
+        new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+    InternalDistributedMember member2 =
+        new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 2);
+    // Create some imbalanced nodes
+    PartitionMemberInfoImpl details1 =
+        buildDetails(member1, 500, 500, new long[] {1, 1, 1, 1}, new long[] {0, 1, 1, 0});
+    PartitionMemberInfoImpl details2 =
+        buildDetails(member2, 500, 500, new long[] {1, 1, 1, 1}, new long[] {1, 0, 0, 1});
+
+    PartitionedRegionLoadModel model = new PartitionedRegionLoadModel(bucketOperator, 0, 4,
+        getAddressComparor(false), Collections.emptySet(), partitionedRegion);
+    model.addRegion("a", Arrays.asList(details1, details2), new FakeOfflineDetails(), true);
+
+    assertThat(4).isEqualTo(doMoves(new CompositeDirector(true, true, true, true), model));

Review comment:
       This assertion (and other similar ones in later tests) should probably be flipped, so the expected value is on the right.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java
##########
@@ -152,6 +155,73 @@ public void testEnforceZoneWithSixServersAndTwoZones(int rebalanceServer,
     compareZoneBucketCounts(REGION_NAME);
   }
 
+
+  /**
+   * This test tests the case that we don't accidentally leave extra copies in a
+   * redundancy zone after a server recovers.
+   *
+   */
+  @Test
+  public void testThreeZonesHaveTwoCopiesAfterRebalance() throws Exception {
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_B);
+        put(3, ZONE_C);
+      }
+    };

Review comment:
       Compiler warning here can be fixed by using:
   ```
       SERVER_ZONE_MAP = new HashMap<>();
       SERVER_ZONE_MAP.put(1, ZONE_A);
       SERVER_ZONE_MAP.put(2, ZONE_B);
       SERVER_ZONE_MAP.put(3, ZONE_C);
   ```

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };
+
+    cleanOutServerDirectories();
+
+    // Startup the servers
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      startServerInRedundancyZone(entry.getKey(), entry.getValue());
+    }
+
+    // Put data in the server regions
+    clientPopulateServers();
+
+    // Rebalance Server VM will initiate the rebalances in this test
+    VM server2 = clusterStartupRule.getVM(2);
+
+    // Take the server 1 offline
+    clusterStartupRule.stop(1, false);
+
+    // Baseline rebalance with everything up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Stop server 3 never to start again.
+    clusterStartupRule.stop(3, true);
+
+    // Restart the server 1
+    startServerInRedundancyZone(1, SERVER_ZONE_MAP.get(1));
+
+    // Rebalance with remaining servers up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // print the bucket count on server 4 for debug. Should be 113 because server 3 is still down
+    assertThat(getBucketCount(4)).isEqualTo(113);
+
+    assertThat(getBucketCount(1)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    assertThat(getBucketCount(2)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    // Verify that all bucket counts add up to what they should
+    int zoneABucketCount = getZoneBucketCount(REGION_NAME, ZONE_A);
+    assertThat(zoneABucketCount).isEqualTo(EXPECTED_BUCKET_COUNT);
+  }
+
+  private int getBucketCount(int server) {
+    return clusterStartupRule.getVM(server).invoke(() -> {
+      PartitionedRegion region =
+          (PartitionedRegion) ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      return region.getLocalBucketsListTestOnly().size();
+    });
+  }
+
+  protected void cleanOutServerDirectory(int server) {
+    VM.getVM(server).invoke(() -> {
+      String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + server;
+      File temporaryDirectory = new File(path);
+      if (temporaryDirectory.exists()) {
+        try {
+          Arrays.stream(temporaryDirectory.listFiles()).forEach(FileUtils::deleteQuietly);
+          Files.delete(temporaryDirectory.toPath());
+        } catch (Exception exception) {
+          logger.error("The delete of files or directory failed ", exception);
+          throw exception;
+        }
+      }
+    });
+  }
+
+  protected void cleanOutServerDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      cleanOutServerDirectory(entry.getKey());
+    }
+  }
+
+  /**
+   * Startup a client to put all the data in the server regions
+   */
+  protected void clientPopulateServers() throws Exception {
+    Properties properties2 = new Properties();
+    ClientVM clientVM =
+        clusterStartupRule.startClientVM(SERVER_ZONE_MAP.size() + 1, properties2,
+            ccf -> ccf.addPoolLocator("localhost", locatorPort));
+
+    clientVM.invoke(() -> {
+
+      Map<Integer, String> putMap = new HashMap<>();
+      for (int i = 0; i < 1000; i++) {
+        putMap.put(i, "A");
+      }
+
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          clientCache.createClientRegionFactory(
+              ClientRegionShortcut.PROXY);
+      clientRegionFactory.create(REGION_NAME);
+      Region<Integer, String> region = clientCache.getRegion(REGION_NAME);
+      region.putAll(putMap);
+    });
+  }
+
+  /**
+   * Startup server *index* in *redundancy zone*
+   *
+   * @param index - server
+   * @param zone - Redundancy zone for the server to be started in
+   */
+  protected void startServerInRedundancyZone(int index, final String zone) {
+
+    clusterStartupRule.startServerVM(index, s -> s
+        .withProperty(REDUNDANCY_ZONE, zone)
+        .withConnectionToLocator(locatorPort));
+
+    VM.getVM(index).invoke(() -> {
+      RegionFactory<Object, Object> regionFactory =
+          ClusterStartupRule.getCache().createRegionFactory(
+              RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);

Review comment:
       Does this bug only affect persistent regions? If not, this should probably just be a `PARTITION_REDUNDANT` region for simplicity.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };
+
+    cleanOutServerDirectories();
+
+    // Startup the servers
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      startServerInRedundancyZone(entry.getKey(), entry.getValue());
+    }
+
+    // Put data in the server regions
+    clientPopulateServers();
+
+    // Rebalance Server VM will initiate the rebalances in this test
+    VM server2 = clusterStartupRule.getVM(2);
+
+    // Take the server 1 offline
+    clusterStartupRule.stop(1, false);
+
+    // Baseline rebalance with everything up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Stop server 3 never to start again.
+    clusterStartupRule.stop(3, true);
+
+    // Restart the server 1
+    startServerInRedundancyZone(1, SERVER_ZONE_MAP.get(1));
+
+    // Rebalance with remaining servers up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // print the bucket count on server 4 for debug. Should be 113 because server 3 is still down
+    assertThat(getBucketCount(4)).isEqualTo(113);
+
+    assertThat(getBucketCount(1)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    assertThat(getBucketCount(2)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    // Verify that all bucket counts add up to what they should
+    int zoneABucketCount = getZoneBucketCount(REGION_NAME, ZONE_A);
+    assertThat(zoneABucketCount).isEqualTo(EXPECTED_BUCKET_COUNT);
+  }
+
+  private int getBucketCount(int server) {
+    return clusterStartupRule.getVM(server).invoke(() -> {
+      PartitionedRegion region =
+          (PartitionedRegion) ClusterStartupRule.getCache().getRegion(REGION_NAME);
+      return region.getLocalBucketsListTestOnly().size();
+    });
+  }
+
+  protected void cleanOutServerDirectory(int server) {
+    VM.getVM(server).invoke(() -> {
+      String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + server;
+      File temporaryDirectory = new File(path);
+      if (temporaryDirectory.exists()) {
+        try {
+          Arrays.stream(temporaryDirectory.listFiles()).forEach(FileUtils::deleteQuietly);
+          Files.delete(temporaryDirectory.toPath());
+        } catch (Exception exception) {
+          logger.error("The delete of files or directory failed ", exception);
+          throw exception;
+        }
+      }
+    });
+  }
+
+  protected void cleanOutServerDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      cleanOutServerDirectory(entry.getKey());
+    }
+  }
+
+  /**
+   * Startup a client to put all the data in the server regions
+   */
+  protected void clientPopulateServers() throws Exception {
+    Properties properties2 = new Properties();
+    ClientVM clientVM =
+        clusterStartupRule.startClientVM(SERVER_ZONE_MAP.size() + 1, properties2,
+            ccf -> ccf.addPoolLocator("localhost", locatorPort));
+
+    clientVM.invoke(() -> {
+
+      Map<Integer, String> putMap = new HashMap<>();
+      for (int i = 0; i < 1000; i++) {
+        putMap.put(i, "A");
+      }
+
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          clientCache.createClientRegionFactory(
+              ClientRegionShortcut.PROXY);
+      clientRegionFactory.create(REGION_NAME);
+      Region<Integer, String> region = clientCache.getRegion(REGION_NAME);
+      region.putAll(putMap);
+    });
+  }
+
+  /**
+   * Startup server *index* in *redundancy zone*
+   *
+   * @param index - server
+   * @param zone - Redundancy zone for the server to be started in
+   */
+  protected void startServerInRedundancyZone(int index, final String zone) {
+
+    clusterStartupRule.startServerVM(index, s -> s
+        .withProperty(REDUNDANCY_ZONE, zone)
+        .withConnectionToLocator(locatorPort));
+
+    VM.getVM(index).invoke(() -> {
+      RegionFactory<Object, Object> regionFactory =
+          ClusterStartupRule.getCache().createRegionFactory(
+              RegionShortcut.PARTITION_REDUNDANT_PERSISTENT);
+      PartitionAttributesImpl partitionAttributesImpl = new PartitionAttributesImpl();
+      partitionAttributesImpl.setRedundantCopies(1);
+      partitionAttributesImpl.setStartupRecoveryDelay(-1);
+      partitionAttributesImpl.setRecoveryDelay(-1);
+      partitionAttributesImpl.setTotalNumBuckets(113);
+      regionFactory.setPartitionAttributes(partitionAttributesImpl);
+      regionFactory.create(REGION_NAME);
+
+    });
+  }
+
+  /**
+   * Trigger a rebalance of buckets
+   *
+   */
+  protected void doRebalance(ResourceManager manager)
+      throws TimeoutException, InterruptedException {
+    manager.createRebalanceFactory()
+        .start().getResults(TIMEOUT_SECONDS, SECONDS);
+    assertThat(manager.getRebalanceOperations()).isEmpty();
+  }
+
+
+  /**
+   * Get the bucket count for the region in the redundancy zone
+   *
+   * @param regionName - name of the region to get the bucket count of
+   * @param zoneName - redundancy zone for which to get the bucket count
+   * @return - the total bucket count for the region in the redundancy zone
+   */
+  protected int getZoneBucketCount(String regionName, String zoneName) {
+    int bucketCount = 0;
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      if (entry.getValue().compareTo(zoneName) == 0) {
+        bucketCount +=
+            clusterStartupRule.getVM(entry.getKey()).invoke(() -> {
+              PartitionedRegion region =
+                  (PartitionedRegion) ClusterStartupRule.getCache().getRegion(regionName);
+
+              return region.getLocalBucketsListTestOnly().size();
+            });

Review comment:
       This can be simplified to:
   ```
         if (entry.getValue().equals(zoneName)) {
           bucketCount += getBucketCount(entry.getKey());
         }
   ```
   and the `regionName` argument removed from the method.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
##########
@@ -902,19 +927,555 @@ public void testMoveBuckets() throws UnknownHostException {
         buildDetails(member2, 500, 500, new long[] {0, 0, 0, 0}, new long[] {0, 0, 0, 0});
     model.addRegion("a", Arrays.asList(details1, details2), new FakeOfflineDetails(), true);
 
-    assertEquals(2, doMoves(new CompositeDirector(false, false, true, true), model));
+    assertThat(doMoves(new CompositeDirector(false, false, true, true), model)).isEqualTo(2);
 
-    assertEquals(Collections.emptyList(), bucketOperator.creates);
-    assertEquals(Collections.emptyList(), bucketOperator.primaryMoves);
+    assertThat(bucketOperator.creates).isEqualTo(Collections.emptyList());
+    assertThat(bucketOperator.primaryMoves).isEqualTo(Collections.emptyList());
 
     // Two of the buckets should move to member2
     List<Move> expectedMoves = new ArrayList<>();
     expectedMoves.add(new Move(member1, member2));
     expectedMoves.add(new Move(member1, member2));
 
-    assertEquals(expectedMoves, bucketOperator.bucketMoves);
+    assertThat(bucketOperator.bucketMoves).isEqualTo(expectedMoves);
+  }
+
+  @Test
+  public void testRemoveBuckets() throws UnknownHostException {
+
+    InternalDistributedMember member1 =
+        new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+    InternalDistributedMember member2 =
+        new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 2);
+    // Create some imbalanced nodes
+    PartitionMemberInfoImpl details1 =
+        buildDetails(member1, 500, 500, new long[] {1, 1, 1, 1}, new long[] {0, 1, 1, 0});
+    PartitionMemberInfoImpl details2 =
+        buildDetails(member2, 500, 500, new long[] {1, 1, 1, 1}, new long[] {1, 0, 0, 1});
+
+    PartitionedRegionLoadModel model = new PartitionedRegionLoadModel(bucketOperator, 0, 4,
+        getAddressComparor(false), Collections.emptySet(), partitionedRegion);
+    model.addRegion("a", Arrays.asList(details1, details2), new FakeOfflineDetails(), true);
+
+    assertThat(4).isEqualTo(doMoves(new CompositeDirector(true, true, true, true), model));
+
+    assertThat(bucketOperator.creates).isEqualTo(Collections.emptyList());
+    assertThat(bucketOperator.primaryMoves).isEqualTo(Collections.emptyList());
+    assertThat(bucketOperator.bucketMoves).isEqualTo(Collections.emptyList());
+
+    // Two of the buckets should move to member2

Review comment:
       It's not clear to me how the below assertion shows that two buckets are moving to member2. Is this comment correct? The same applies to other similar comments in other tests added in this PR.

##########
File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexPart2DistributedTest.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionAttributesImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy zones
+ * is working correctly.
+ */
+public class RebalanceOperationComplexPart2DistributedTest
+    implements Serializable {
+
+  private static final int EXPECTED_BUCKET_COUNT = 113;
+  private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
+  private static final String REGION_NAME = "primary";
+  private static final Logger logger = LogService.getLogger();
+
+  private static final String ZONE_A = "zoneA";
+  private static final String ZONE_B = "zoneB";
+
+  private int locatorPort;
+  private static final AtomicInteger runID = new AtomicInteger(0);
+  private String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  private static Map<Integer, String> SERVER_ZONE_MAP;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(5);
+
+  @Before
+  public void setup() {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  protected void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes. This does not use a
+   * rebalance once the servers are down.
+   *
+   */
+  @Test
+  public void testRecoveryWithOneServerPermanentlyDownAndOneRestarted() throws Exception {
+
+    SERVER_ZONE_MAP = new HashMap<Integer, String>() {
+      {
+        put(1, ZONE_A);
+        put(2, ZONE_A);
+        put(3, ZONE_B);
+        put(4, ZONE_B);
+      }
+    };
+
+    cleanOutServerDirectories();
+
+    // Startup the servers
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      startServerInRedundancyZone(entry.getKey(), entry.getValue());
+    }
+
+    // Put data in the server regions
+    clientPopulateServers();
+
+    // Rebalance Server VM will initiate the rebalances in this test
+    VM server2 = clusterStartupRule.getVM(2);
+
+    // Take the server 1 offline
+    clusterStartupRule.stop(1, false);
+
+    // Baseline rebalance with everything up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Stop server 3 never to start again.
+    clusterStartupRule.stop(3, true);
+
+    // Restart the server 1
+    startServerInRedundancyZone(1, SERVER_ZONE_MAP.get(1));
+
+    // Rebalance with remaining servers up
+    server2.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // print the bucket count on server 4 for debug. Should be 113 because server 3 is still down
+    assertThat(getBucketCount(4)).isEqualTo(113);
+
+    assertThat(getBucketCount(1)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);
+
+    assertThat(getBucketCount(2)).isGreaterThanOrEqualTo(56).isLessThanOrEqualTo(57);

Review comment:
       Instead of being hard-coded, these numbers could be `EXPECTED_BUCKET_COUNT / 2` and `(EXPECTED_BUCKET_COUNT / 2) + 1`. Also, it would be good to add an assertion that the sum of the two bucket counts is equal to `EXPECTED_BUCKET_COUNT`, as with the current assertions it's possible that somehow both servers have 56 or 57 buckets.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org