You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2020/06/17 00:26:44 UTC

[GitHub] [geode] DonalEvans commented on a change in pull request #5249: Refactor Restore Redundancy Command

DonalEvans commented on a change in pull request #5249:
URL: https://github.com/apache/geode/pull/5249#discussion_r441065594



##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/operation/RebalanceOperationPerformer.java
##########
@@ -189,16 +189,16 @@ public static DistributedMember getAssociatedMembers(String region, final Intern
 
     String[] membersName = bean.getMembers();
     Set<DistributedMember> dsMembers = ManagementUtils.getAllMembers(cache);
-    Iterator it = dsMembers.iterator();
+    Iterator<DistributedMember> it = dsMembers.iterator();
 
     boolean matchFound = false;
 
     if (membersName.length > 1) {
       while (it.hasNext() && !matchFound) {
-        DistributedMember dsmember = (DistributedMember) it.next();
+        DistributedMember DSMember = it.next();

Review comment:
       I think that variable names should start with a lower-case letter.

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) redundancyOperation.redundancyStatus();
+      } else {
+        redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;
+      }
+      results.setSuccess(true);
+      results.setStatusMessage("Success"); // MLH change this

Review comment:
       This comment should be removed.

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) redundancyOperation.redundancyStatus();
+      } else {
+        redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;
+      }
+      results.setSuccess(true);
+      results.setStatusMessage("Success"); // MLH change this
+    } catch (Exception e) {
+      results =
+          new SerializableRestoreRedundancyResultsImpl();
+      results.setSuccess(false);
+      results.setStatusMessage(e.getMessage());
+    }

Review comment:
       I'm not sure I understand why in the case that an exception is thrown, a `SerializableRestoreRedundancyResultsImpl` is returned here instead of a `RestoreRedundancyResultsImpl`.

##########
File path: geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();
+    request.setReassignPrimaries(true);
+
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, false});
+    when(mockCache.getResourceManager().createRestoreRedundancyOperation())
+        .thenReturn(mockOperation);
+    CompletableFuture<RestoreRedundancyResults> future =
+        CompletableFuture.completedFuture(mockResults);
+    when(mockOperation.start()).thenReturn(future);
+    when(mockResults.getRegionOperationMessage()).thenReturn(message);
+    // when(mockResults.getStatusMessage()).thenReturn(message);

Review comment:
       Remove this commented out code.

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new RestoreRedundancyResultsImpl();
+        results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a member hosting this
+      // region. If one has, there is no point sending a function for this region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);

Review comment:
       There is a redundant call to `setSuccess(false)` here, since it's already been called a few lines above.

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new RestoreRedundancyResultsImpl();
+        results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a member hosting this
+      // region. If one has, there is no point sending a function for this region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());

Review comment:
       These lines appear to be redundant within the for loop, since in order to reach this point, all function results must be successful. These calls to `setSuccess()` and `setStatusMessage()` could be moved outside the for loop and only called once.

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];

Review comment:
       This variable is set in `RestoreRedundancyPerformer` using the name `checkStatus`. It might be best to have consistency between classes in terms of naming, for clarity.

##########
File path: geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();

Review comment:
       Can this `RestoreRedundancyRequest` object be replaced with a mock, to avoid testing the behaviour of both it and the `RestoreRedundancyFunction` class in this unit test?

##########
File path: geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new RestoreRedundancyResultsImpl();
+        results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a member hosting this
+      // region. If one has, there is no point sending a function for this region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults

Review comment:
       This comment does not seem entirely accurate. The method either returns `null` or a `RestoreRedundancyResults` object.

##########
File path: geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();
+    request.setReassignPrimaries(true);
+
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, false});
+    when(mockCache.getResourceManager().createRestoreRedundancyOperation())
+        .thenReturn(mockOperation);
+    CompletableFuture<RestoreRedundancyResults> future =
+        CompletableFuture.completedFuture(mockResults);
+    when(mockOperation.start()).thenReturn(future);
+    when(mockResults.getRegionOperationMessage()).thenReturn(message);
+    // when(mockResults.getStatusMessage()).thenReturn(message);
+    resultSender = mock(ResultSender.class);
+    when(mockContext.getResultSender()).thenReturn(resultSender);
+    argumentCaptor = ArgumentCaptor.forClass(SerializableRestoreRedundancyResultsImpl.class);
+  }
+
+  @Test
+  public void executeFunctionSetsFieldsOnRestoreRedundancyOperation() {
+    String[] includeRegions = {"includedRegion1", "includedRegion2"};
+    String[] excludeRegions = {"excludedRegion1", "excludedRegion2"};
+    request.setExcludeRegions(Arrays.asList(excludeRegions));
+    request.setIncludeRegions(Arrays.asList(includeRegions));
+
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(new HashSet<>(request.getIncludeRegions()));
+    verify(mockOperation).excludeRegions(new HashSet<>(request.getExcludeRegions()));
+    verify(mockOperation).shouldReassignPrimaries(request.getReassignPrimaries());
+  }
+
+  @Test
+  public void executeFunctionSetsIncludedAndExcludedRegionsOnRestoreRedundancyOperationWhenNull() {
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(null);
+    verify(mockOperation).excludeRegions(null);
+    verify(mockOperation).shouldReassignPrimaries(true);
+  }
+
+  @Test
+  public void executeFunctionUsesStatusMethodWhenIsStatusCommandIsTrue() {
+    when(mockOperation.redundancyStatus()).thenReturn(mockResults);
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.SUCCESS);
+    // isStatusCommand is the fourth argument passed to the function
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, true});
+
+    function.execute(mockContext);
+
+    verify(mockOperation, times(1)).redundancyStatus();
+    verify(mockOperation, times(0)).start();
+  }
+
+  @Test
+  public void executeFunctionReturnsErrorWhenResultStatusIsError() {
+    when(mockResults.getRegionOperationStatus()).thenReturn(RestoreRedundancyResults.Status.ERROR);
+    function.execute(mockContext);
+    verify(resultSender).lastResult(argumentCaptor.capture());
+
+    RestoreRedundancyResults result = argumentCaptor.getValue();
+    assertThat(result.getSuccess()).isFalse();
+    assertThat(result.getStatusMessage()).isEqualTo(message);
+  }
+
+  @Test
+  // The function was able to execute successfully but redundancy was not able to be established for
+  // at least one region
+  public void executeFunctionReturnsOkWhenResultStatusIsFailure() {
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.FAILURE);
+    function.execute(mockContext);
+    verify(resultSender).lastResult(argumentCaptor.capture());
+
+    SerializableRestoreRedundancyResultsImpl result = argumentCaptor.getValue();
+    verify(result).setSuccess(true);
+    assertThat(result.getRegionOperationStatus())
+        .isEqualTo(RestoreRedundancyResults.Status.FAILURE);
+    assertThat(result).isSameAs(mockResults);
+  }
+
+  @Test
+  public void executeFunctionReturnsOkWhenResultStatusIsSuccess() {
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.SUCCESS);
+    function.execute(mockContext);
+    verify(resultSender).lastResult(argumentCaptor.capture());
+
+    SerializableRestoreRedundancyResultsImpl result = argumentCaptor.getValue();
+    verify(result).setSuccess(true);
+    assertThat(result.getRegionOperationStatus())
+        .isEqualTo(RestoreRedundancyResults.Status.SUCCESS);
+    assertThat(result).isSameAs(mockResults);
+  }
+
+  @Test
+  public void whenFunctionThrowException() throws Exception {

Review comment:
       This test name could be a little more descriptive, saying what the expected behaviour is given the test conditions, such as "executeFunctionReturnsFailureResultWhenExceptionIsThrownDuringOperation". Also, an exception is never thrown from this method, so the `throws Exception` can be removed from the method signature.

##########
File path: geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();
+    request.setReassignPrimaries(true);
+
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, false});
+    when(mockCache.getResourceManager().createRestoreRedundancyOperation())
+        .thenReturn(mockOperation);
+    CompletableFuture<RestoreRedundancyResults> future =
+        CompletableFuture.completedFuture(mockResults);
+    when(mockOperation.start()).thenReturn(future);
+    when(mockResults.getRegionOperationMessage()).thenReturn(message);
+    // when(mockResults.getStatusMessage()).thenReturn(message);
+    resultSender = mock(ResultSender.class);
+    when(mockContext.getResultSender()).thenReturn(resultSender);
+    argumentCaptor = ArgumentCaptor.forClass(SerializableRestoreRedundancyResultsImpl.class);
+  }
+
+  @Test
+  public void executeFunctionSetsFieldsOnRestoreRedundancyOperation() {
+    String[] includeRegions = {"includedRegion1", "includedRegion2"};
+    String[] excludeRegions = {"excludedRegion1", "excludedRegion2"};
+    request.setExcludeRegions(Arrays.asList(excludeRegions));
+    request.setIncludeRegions(Arrays.asList(includeRegions));
+
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(new HashSet<>(request.getIncludeRegions()));
+    verify(mockOperation).excludeRegions(new HashSet<>(request.getExcludeRegions()));
+    verify(mockOperation).shouldReassignPrimaries(request.getReassignPrimaries());
+  }
+
+  @Test
+  public void executeFunctionSetsIncludedAndExcludedRegionsOnRestoreRedundancyOperationWhenNull() {
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(null);
+    verify(mockOperation).excludeRegions(null);
+    verify(mockOperation).shouldReassignPrimaries(true);
+  }
+
+  @Test
+  public void executeFunctionUsesStatusMethodWhenIsStatusCommandIsTrue() {
+    when(mockOperation.redundancyStatus()).thenReturn(mockResults);
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.SUCCESS);
+    // isStatusCommand is the fourth argument passed to the function

Review comment:
       This comment is no longer correct. The argument that controls whether or not the function should restore redundancy or just check the redundancy status is now the second argument. Also, see the comment in `RestoreRedundancyFunction` regarding the name of this variable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org