You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/01 21:03:38 UTC
[44/50] [abbrv] geode git commit: GEODE-3256: Refactoring DataCommands
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ImportDataCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ImportDataCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ImportDataCommand.java
new file mode 100644
index 0000000..2308405
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/ImportDataCommand.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import java.util.List;
+
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.cli.CliUtil;
+import org.apache.geode.management.internal.cli.functions.ImportDataFunction;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.result.ResultBuilder;
+
+public class ImportDataCommand implements GfshCommand {
+ private final ImportDataFunction importDataFunction = new ImportDataFunction();
+
+ @CliCommand(value = CliStrings.IMPORT_DATA, help = CliStrings.IMPORT_DATA__HELP)
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+ public Result importData(
+ @CliOption(key = CliStrings.IMPORT_DATA__REGION, optionContext = ConverterHint.REGION_PATH,
+ mandatory = true, help = CliStrings.IMPORT_DATA__REGION__HELP) String regionName,
+ @CliOption(key = CliStrings.IMPORT_DATA__FILE, mandatory = true,
+ help = CliStrings.IMPORT_DATA__FILE__HELP) String filePath,
+ @CliOption(key = CliStrings.MEMBER, mandatory = true,
+ optionContext = ConverterHint.MEMBERIDNAME,
+ help = CliStrings.IMPORT_DATA__MEMBER__HELP) String memberNameOrId,
+ @CliOption(key = CliStrings.IMPORT_DATA__INVOKE_CALLBACKS, unspecifiedDefaultValue = "false",
+ help = CliStrings.IMPORT_DATA__INVOKE_CALLBACKS__HELP) boolean invokeCallbacks) {
+
+ getSecurityService().authorizeRegionWrite(regionName);
+
+ Result result;
+
+ try {
+ final DistributedMember targetMember = CliUtil.getDistributedMemberByNameOrId(memberNameOrId);
+
+ if (!filePath.endsWith(CliStrings.GEODE_DATA_FILE_EXTENSION)) {
+ return ResultBuilder.createUserErrorResult(CliStrings
+ .format(CliStrings.INVALID_FILE_EXTENSION, CliStrings.GEODE_DATA_FILE_EXTENSION));
+ }
+ if (targetMember != null) {
+ final Object args[] = {regionName, filePath, invokeCallbacks};
+ ResultCollector<?, ?> rc = CliUtil.executeFunction(importDataFunction, args, targetMember);
+ List<Object> results = (List<Object>) rc.getResult();
+
+ if (results != null) {
+ Object resultObj = results.get(0);
+
+ if (resultObj instanceof String) {
+ result = ResultBuilder.createInfoResult((String) resultObj);
+ } else if (resultObj instanceof Exception) {
+ result = ResultBuilder.createGemFireErrorResult(((Exception) resultObj).getMessage());
+ } else {
+ result = ResultBuilder.createGemFireErrorResult(
+ CliStrings.format(CliStrings.COMMAND_FAILURE_MESSAGE, CliStrings.IMPORT_DATA));
+ }
+ } else {
+ result = ResultBuilder.createGemFireErrorResult(
+ CliStrings.format(CliStrings.COMMAND_FAILURE_MESSAGE, CliStrings.IMPORT_DATA));
+ }
+ } else {
+ result = ResultBuilder.createUserErrorResult(
+ CliStrings.format(CliStrings.IMPORT_DATA__MEMBER__NOT__FOUND, memberNameOrId));
+ }
+ } catch (CacheClosedException e) {
+ result = ResultBuilder.createGemFireErrorResult(e.getMessage());
+ } catch (FunctionInvocationTargetException e) {
+ result = ResultBuilder.createGemFireErrorResult(
+ CliStrings.format(CliStrings.COMMAND_FAILURE_MESSAGE, CliStrings.IMPORT_DATA));
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/LocateEntryCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/LocateEntryCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/LocateEntryCommand.java
new file mode 100644
index 0000000..f4caf13
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/LocateEntryCommand.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.callFunctionForRegion;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.getRegionAssociatedMembers;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.makePresentationResult;
+
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.cli.domain.DataCommandRequest;
+import org.apache.geode.management.internal.cli.domain.DataCommandResult;
+import org.apache.geode.management.internal.cli.functions.DataCommandFunction;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+
+public class LocateEntryCommand implements GfshCommand {
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+ @CliCommand(value = {CliStrings.LOCATE_ENTRY}, help = CliStrings.LOCATE_ENTRY__HELP)
+ public Result locateEntry(
+ @CliOption(key = {CliStrings.LOCATE_ENTRY__KEY}, mandatory = true,
+ help = CliStrings.LOCATE_ENTRY__KEY__HELP) String key,
+ @CliOption(key = {CliStrings.LOCATE_ENTRY__REGIONNAME}, mandatory = true,
+ help = CliStrings.LOCATE_ENTRY__REGIONNAME__HELP,
+ optionContext = ConverterHint.REGION_PATH) String regionPath,
+ @CliOption(key = {CliStrings.LOCATE_ENTRY__KEYCLASS},
+ help = CliStrings.LOCATE_ENTRY__KEYCLASS__HELP) String keyClass,
+ @CliOption(key = {CliStrings.LOCATE_ENTRY__VALUEKLASS},
+ help = CliStrings.LOCATE_ENTRY__VALUEKLASS__HELP) String valueClass,
+ @CliOption(key = {CliStrings.LOCATE_ENTRY__RECURSIVE},
+ help = CliStrings.LOCATE_ENTRY__RECURSIVE__HELP,
+ unspecifiedDefaultValue = "false") boolean recursive) {
+
+ getSecurityService().authorizeRegionRead(regionPath, key);
+
+ DataCommandResult dataResult;
+
+ if (StringUtils.isEmpty(regionPath)) {
+ return makePresentationResult(DataCommandResult.createLocateEntryResult(key, null, null,
+ CliStrings.LOCATE_ENTRY__MSG__REGIONNAME_EMPTY, false));
+ }
+
+ if (StringUtils.isEmpty(key)) {
+ return makePresentationResult(DataCommandResult.createLocateEntryResult(key, null, null,
+ CliStrings.LOCATE_ENTRY__MSG__KEY_EMPTY, false));
+ }
+
+ DataCommandFunction locateEntry = new DataCommandFunction();
+ Set<DistributedMember> memberList = getRegionAssociatedMembers(regionPath, getCache(), true);
+ if (CollectionUtils.isNotEmpty(memberList)) {
+ DataCommandRequest request = new DataCommandRequest();
+ request.setCommand(CliStrings.LOCATE_ENTRY);
+ request.setKey(key);
+ request.setKeyClass(keyClass);
+ request.setRegionName(regionPath);
+ request.setValueClass(valueClass);
+ request.setRecursive(recursive);
+ dataResult = callFunctionForRegion(request, locateEntry, memberList);
+ } else {
+ dataResult = DataCommandResult.createLocateEntryInfoResult(key, null, null, CliStrings.format(
+ CliStrings.LOCATE_ENTRY__MSG__REGION_NOT_FOUND_ON_ALL_MEMBERS, regionPath), false);
+ }
+ dataResult.setKeyClass(keyClass);
+ if (valueClass != null) {
+ dataResult.setValueClass(valueClass);
+ }
+
+ return makePresentationResult(dataResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/PutCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/PutCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/PutCommand.java
new file mode 100644
index 0000000..3571f6d
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/PutCommand.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.callFunctionForRegion;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.getRegionAssociatedMembers;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.makePresentationResult;
+
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.cli.domain.DataCommandRequest;
+import org.apache.geode.management.internal.cli.domain.DataCommandResult;
+import org.apache.geode.management.internal.cli.functions.DataCommandFunction;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+
+public class PutCommand implements GfshCommand {
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+ @CliCommand(value = {CliStrings.PUT}, help = CliStrings.PUT__HELP)
+ public Result put(
+ @CliOption(key = {CliStrings.PUT__KEY}, mandatory = true,
+ help = CliStrings.PUT__KEY__HELP) String key,
+ @CliOption(key = {CliStrings.PUT__VALUE}, mandatory = true,
+ help = CliStrings.PUT__VALUE__HELP) String value,
+ @CliOption(key = {CliStrings.PUT__REGIONNAME}, mandatory = true,
+ help = CliStrings.PUT__REGIONNAME__HELP,
+ optionContext = ConverterHint.REGION_PATH) String regionPath,
+ @CliOption(key = {CliStrings.PUT__KEYCLASS},
+ help = CliStrings.PUT__KEYCLASS__HELP) String keyClass,
+ @CliOption(key = {CliStrings.PUT__VALUEKLASS},
+ help = CliStrings.PUT__VALUEKLASS__HELP) String valueClass,
+ @CliOption(key = {CliStrings.PUT__PUTIFABSENT}, help = CliStrings.PUT__PUTIFABSENT__HELP,
+ unspecifiedDefaultValue = "false") boolean putIfAbsent) {
+
+ InternalCache cache = getCache();
+ cache.getSecurityService().authorizeRegionWrite(regionPath);
+ DataCommandResult dataResult;
+ if (StringUtils.isEmpty(regionPath)) {
+ return makePresentationResult(DataCommandResult.createPutResult(key, null, null,
+ CliStrings.PUT__MSG__REGIONNAME_EMPTY, false));
+ }
+
+ if (StringUtils.isEmpty(key)) {
+ return makePresentationResult(DataCommandResult.createPutResult(key, null, null,
+ CliStrings.PUT__MSG__KEY_EMPTY, false));
+ }
+
+ if (StringUtils.isEmpty(value)) {
+ return makePresentationResult(DataCommandResult.createPutResult(value, null, null,
+ CliStrings.PUT__MSG__VALUE_EMPTY, false));
+ }
+
+ @SuppressWarnings("rawtypes")
+ Region region = cache.getRegion(regionPath);
+ DataCommandFunction putfn = new DataCommandFunction();
+ if (region == null) {
+ Set<DistributedMember> memberList = getRegionAssociatedMembers(regionPath, getCache(), false);
+ if (CollectionUtils.isNotEmpty(memberList)) {
+ DataCommandRequest request = new DataCommandRequest();
+ request.setCommand(CliStrings.PUT);
+ request.setValue(value);
+ request.setKey(key);
+ request.setKeyClass(keyClass);
+ request.setRegionName(regionPath);
+ request.setValueClass(valueClass);
+ request.setPutIfAbsent(putIfAbsent);
+ dataResult = callFunctionForRegion(request, putfn, memberList);
+ } else {
+ dataResult = DataCommandResult.createPutInfoResult(key, value, null,
+ CliStrings.format(CliStrings.PUT__MSG__REGION_NOT_FOUND_ON_ALL_MEMBERS, regionPath),
+ false);
+ }
+ } else {
+ dataResult = putfn.put(key, value, putIfAbsent, keyClass, valueClass, regionPath);
+ }
+ dataResult.setKeyClass(keyClass);
+ if (valueClass != null) {
+ dataResult.setValueClass(valueClass);
+ }
+ return makePresentationResult(dataResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/QueryCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/QueryCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/QueryCommand.java
index 513cd65..3c039b3 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/QueryCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/QueryCommand.java
@@ -18,6 +18,8 @@ package org.apache.geode.management.internal.cli.commands;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
@@ -27,6 +29,8 @@ import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.internal.CompiledValue;
import org.apache.geode.cache.query.internal.QCompiler;
@@ -71,7 +75,7 @@ public class QueryCommand implements GfshCommand {
return dataResult;
}
- Object array[] = DataCommands.replaceGfshEnvVar(query, CommandExecutionContext.getShellEnv());
+ Object array[] = replaceGfshEnvVar(query, CommandExecutionContext.getShellEnv());
query = (String) array[1];
boolean limitAdded = false;
@@ -97,7 +101,7 @@ public class QueryCommand implements GfshCommand {
regionsInQuery = Collections.unmodifiableSet(regions);
if (regionsInQuery.size() > 0) {
Set<DistributedMember> members =
- DataCommands.getQueryRegionsAssociatedMembers(regionsInQuery, cache, false);
+ DataCommandsUtils.getQueryRegionsAssociatedMembers(regionsInQuery, cache, false);
if (members != null && members.size() > 0) {
DataCommandFunction function = new DataCommandFunction();
DataCommandRequest request = new DataCommandRequest();
@@ -107,7 +111,7 @@ public class QueryCommand implements GfshCommand {
if (subject != null) {
request.setPrincipal(subject.getPrincipal());
}
- dataResult = DataCommands.callFunctionForRegion(request, function, members);
+ dataResult = callFunctionForRegion(request, function, members);
dataResult.setInputQuery(query);
if (limitAdded) {
dataResult.setLimit(CommandExecutionContext.getShellFetchSize());
@@ -129,4 +133,75 @@ public class QueryCommand implements GfshCommand {
CliStrings.format(CliStrings.QUERY__MSG__INVALID_QUERY, qe.getMessage()), false);
}
}
+
+ public static DataCommandResult callFunctionForRegion(DataCommandRequest request,
+ DataCommandFunction putfn, Set<DistributedMember> members) {
+
+ if (members.size() == 1) {
+ DistributedMember member = members.iterator().next();
+ ResultCollector collector =
+ FunctionService.onMember(member).setArguments(request).execute(putfn);
+ List list = (List) collector.getResult();
+ Object object = list.get(0);
+ if (object instanceof Throwable) {
+ Throwable error = (Throwable) object;
+ DataCommandResult result = new DataCommandResult();
+ result.setErorr(error);
+ result.setErrorString(error.getMessage());
+ return result;
+ }
+ DataCommandResult result = (DataCommandResult) list.get(0);
+ result.aggregate(null);
+ return result;
+ } else {
+ ResultCollector collector =
+ FunctionService.onMembers(members).setArguments(request).execute(putfn);
+ List list = (List) collector.getResult();
+ DataCommandResult result = null;
+ for (Object object : list) {
+ if (object instanceof Throwable) {
+ Throwable error = (Throwable) object;
+ result = new DataCommandResult();
+ result.setErorr(error);
+ result.setErrorString(error.getMessage());
+ return result;
+ }
+
+ if (result == null) {
+ result = (DataCommandResult) object;
+ result.aggregate(null);
+ } else {
+ result.aggregate((DataCommandResult) object);
+ }
+ }
+ return result;
+ }
+ }
+
+ private static Object[] replaceGfshEnvVar(String query, Map<String, String> gfshEnvVarMap) {
+ boolean done = false;
+ int startIndex = 0;
+ int replacedVars = 0;
+ while (!done) {
+ int index1 = query.indexOf("${", startIndex);
+ if (index1 == -1) {
+ break;
+ }
+ int index2 = query.indexOf("}", index1);
+ if (index2 == -1) {
+ break;
+ }
+ String var = query.substring(index1 + 2, index2);
+ String value = gfshEnvVarMap.get(var);
+ if (value != null) {
+ query = query.replaceAll("\\$\\{" + var + "\\}", value);
+ replacedVars++;
+ }
+ startIndex = index2 + 1;
+ if (startIndex >= query.length()) {
+ done = true;
+ }
+ }
+ return new Object[] {replacedVars, query};
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RebalanceCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RebalanceCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RebalanceCommand.java
new file mode 100644
index 0000000..85801ba
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RebalanceCommand.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.checkResultList;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.toCompositeResultData;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.tokenize;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.RebalanceOperation;
+import org.apache.geode.cache.control.RebalanceResults;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.partition.PartitionRebalanceInfo;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.DistributedRegionMXBean;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.management.internal.cli.CliUtil;
+import org.apache.geode.management.internal.cli.LogWrapper;
+import org.apache.geode.management.internal.cli.functions.RebalanceFunction;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.result.CompositeResultData;
+import org.apache.geode.management.internal.cli.result.ErrorResultData;
+import org.apache.geode.management.internal.cli.result.ResultBuilder;
+import org.apache.geode.management.internal.cli.result.TabularResultData;
+import org.apache.geode.management.internal.security.ResourceOperation;
+import org.apache.geode.security.ResourcePermission;
+
+public class RebalanceCommand implements GfshCommand {
+ @CliCommand(value = CliStrings.REBALANCE, help = CliStrings.REBALANCE__HELP)
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+ @ResourceOperation(resource = ResourcePermission.Resource.DATA,
+ operation = ResourcePermission.Operation.MANAGE)
+ public Result rebalance(
+ @CliOption(key = CliStrings.REBALANCE__INCLUDEREGION,
+ help = CliStrings.REBALANCE__INCLUDEREGION__HELP) String[] includeRegions,
+ @CliOption(key = CliStrings.REBALANCE__EXCLUDEREGION,
+ help = CliStrings.REBALANCE__EXCLUDEREGION__HELP) String[] excludeRegions,
+ @CliOption(key = CliStrings.REBALANCE__TIMEOUT, unspecifiedDefaultValue = "-1",
+ help = CliStrings.REBALANCE__TIMEOUT__HELP) long timeout,
+ @CliOption(key = CliStrings.REBALANCE__SIMULATE, specifiedDefaultValue = "true",
+ unspecifiedDefaultValue = "false",
+ help = CliStrings.REBALANCE__SIMULATE__HELP) boolean simulate) {
+
+ ExecutorService commandExecutors = Executors.newSingleThreadExecutor();
+ List<Future<Result>> commandResult = new ArrayList<>();
+ Result result;
+ try {
+ commandResult.add(commandExecutors
+ .submit(new ExecuteRebalanceWithTimeout(includeRegions, excludeRegions, simulate)));
+
+ Future<Result> fs = commandResult.get(0);
+ if (timeout > 0) {
+ result = fs.get(timeout, TimeUnit.SECONDS);
+ } else {
+ result = fs.get();
+
+ }
+ } catch (TimeoutException timeoutException) {
+ result = ResultBuilder.createInfoResult(CliStrings.REBALANCE__MSG__REBALANCE_WILL_CONTINUE);
+
+ } catch (Exception ex) {
+ result = ResultBuilder.createGemFireErrorResult(CliStrings.format(
+ CliStrings.REBALANCE__MSG__EXCEPTION_OCCURRED_WHILE_REBALANCING_0, ex.getMessage()));
+ }
+ LogWrapper.getInstance().info("Rebalance returning result >>>" + result);
+ return result;
+ }
+
+ // TODO EY Move this to its own class
+ private class ExecuteRebalanceWithTimeout implements Callable<Result> {
+ String[] includeRegions = null;
+ String[] excludeRegions = null;
+ boolean simulate;
+ InternalCache cache = getCache();
+
+ @Override
+ public Result call() throws Exception {
+ return executeRebalanceWithTimeout(includeRegions, excludeRegions, simulate);
+ }
+
+ ExecuteRebalanceWithTimeout(String[] includedRegions, String[] excludedRegions,
+ boolean toSimulate) {
+ includeRegions = includedRegions;
+ excludeRegions = excludedRegions;
+ simulate = toSimulate;
+ }
+
+ Result executeRebalanceWithTimeout(String[] includeRegions, String[] excludeRegions,
+ boolean simulate) {
+
+ Result result = null;
+ try {
+ RebalanceOperation op;
+
+ if (ArrayUtils.isNotEmpty(includeRegions)) {
+ CompositeResultData rebalanceResultData = ResultBuilder.createCompositeResultData();
+ int index = 0;
+
+ for (String regionName : includeRegions) {
+
+ // To be removed after region Name specification with "/" is fixed
+ regionName = regionName.startsWith("/") ? regionName : ("/" + regionName);
+ Region region = cache.getRegion(regionName);
+
+ if (region == null) {
+ DistributedMember member = getAssociatedMembers(regionName, cache);
+
+ if (member == null) {
+ LogWrapper.getInstance().info(CliStrings.format(
+ CliStrings.REBALANCE__MSG__NO_ASSOCIATED_DISTRIBUTED_MEMBER, regionName));
+ continue;
+ }
+
+ Function rebalanceFunction = new RebalanceFunction();
+ Object[] functionArgs = new Object[3];
+ functionArgs[0] = simulate ? "true" : "false";
+ Set<String> setRegionName = new HashSet<>();
+ setRegionName.add(regionName);
+ functionArgs[1] = setRegionName;
+
+ Set<String> excludeRegionSet = new HashSet<>();
+ if (ArrayUtils.isNotEmpty(excludeRegions)) {
+ Collections.addAll(excludeRegionSet, excludeRegions);
+ }
+ functionArgs[2] = excludeRegionSet;
+
+ if (simulate) {
+ List resultList;
+ try {
+ resultList = (ArrayList) CliUtil
+ .executeFunction(rebalanceFunction, functionArgs, member).getResult();
+ } catch (Exception ex) {
+ LogWrapper.getInstance()
+ .info(CliStrings.format(
+ CliStrings.REBALANCE__MSG__EXCEPTION_IN_REBALANCE_FOR_MEMBER_0_Exception_1,
+ member.getId(), ex.getMessage()), ex);
+ rebalanceResultData.addSection()
+ .addData(CliStrings.format(
+ CliStrings.REBALANCE__MSG__EXCEPTION_IN_REBALANCE_FOR_MEMBER_0_Exception,
+ member.getId()), ex.getMessage());
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ continue;
+ }
+
+ if (checkResultList(rebalanceResultData, resultList, member)) {
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ continue;
+ }
+ List<String> rstList = tokenize((String) resultList.get(0), ",");
+
+ result = ResultBuilder.buildResult(toCompositeResultData(rebalanceResultData,
+ (ArrayList) rstList, index, true, cache));
+ } else {
+ List resultList;
+ try {
+ resultList = (ArrayList) CliUtil
+ .executeFunction(rebalanceFunction, functionArgs, member).getResult();
+ } catch (Exception ex) {
+ LogWrapper.getInstance()
+ .info(CliStrings.format(
+ CliStrings.REBALANCE__MSG__EXCEPTION_IN_REBALANCE_FOR_MEMBER_0_Exception_1,
+ member.getId(), ex.getMessage()), ex);
+ rebalanceResultData.addSection()
+ .addData(CliStrings.format(
+ CliStrings.REBALANCE__MSG__EXCEPTION_IN_REBALANCE_FOR_MEMBER_0_Exception,
+ member.getId()), ex.getMessage());
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ continue;
+ }
+
+ if (checkResultList(rebalanceResultData, resultList, member)) {
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ continue;
+ }
+ List<String> rstList = tokenize((String) resultList.get(0), ",");
+
+ result = ResultBuilder.buildResult(toCompositeResultData(rebalanceResultData,
+ (ArrayList) rstList, index, false, cache));
+ }
+
+ } else {
+
+ ResourceManager manager = cache.getResourceManager();
+ RebalanceFactory rbFactory = manager.createRebalanceFactory();
+ Set<String> excludeRegionSet = new HashSet<>();
+ if (excludeRegions != null) {
+ Collections.addAll(excludeRegionSet, excludeRegions);
+ }
+ rbFactory.excludeRegions(excludeRegionSet);
+ Set<String> includeRegionSet = new HashSet<>();
+ includeRegionSet.add(regionName);
+ rbFactory.includeRegions(includeRegionSet);
+
+ if (simulate) {
+ op = manager.createRebalanceFactory().simulate();
+ result = ResultBuilder.buildResult(buildResultForRebalance(rebalanceResultData,
+ op.getResults(), index, true, cache));
+
+ } else {
+ op = manager.createRebalanceFactory().start();
+ // Wait until the rebalance is complete and then get the results
+ result = ResultBuilder.buildResult(buildResultForRebalance(rebalanceResultData,
+ op.getResults(), index, false, cache));
+ }
+ }
+ index++;
+ }
+ LogWrapper.getInstance().info("Rebalance returning result " + result);
+ return result;
+ } else {
+ result = executeRebalanceOnDS(cache, String.valueOf(simulate), excludeRegions);
+ LogWrapper.getInstance().info("Starting Rebalance simulate false result >> " + result);
+ }
+ } catch (Exception e) {
+ result = ResultBuilder.createGemFireErrorResult(e.getMessage());
+ }
+ LogWrapper.getInstance().info("Rebalance returning result >>>" + result);
+ return result;
+ }
+ }
+
+ private DistributedMember getAssociatedMembers(String region, final InternalCache cache) {
+ DistributedRegionMXBean bean =
+ ManagementService.getManagementService(cache).getDistributedRegionMXBean(region);
+
+ DistributedMember member = null;
+
+ if (bean == null) {
+ return null;
+ }
+
+ String[] membersName = bean.getMembers();
+ Set<DistributedMember> dsMembers = CliUtil.getAllMembers(cache);
+ Iterator it = dsMembers.iterator();
+
+ boolean matchFound = false;
+
+ if (membersName.length > 1) {
+ while (it.hasNext() && !matchFound) {
+ DistributedMember dsmember = (DistributedMember) it.next();
+ for (String memberName : membersName) {
+ if (MBeanJMXAdapter.getMemberNameOrId(dsmember).equals(memberName)) {
+ member = dsmember;
+ matchFound = true;
+ break;
+ }
+ }
+ }
+ }
+ return member;
+ }
+
+ private CompositeResultData buildResultForRebalance(CompositeResultData rebalanceResultData,
+ RebalanceResults results, int index, boolean simulate, InternalCache cache) {
+ Set<PartitionRebalanceInfo> regions = results.getPartitionRebalanceDetails();
+ Iterator iterator = regions.iterator();
+
+ // add only if there are valid number of regions
+ if (regions.size() > 0
+ && StringUtils.isNotEmpty(((PartitionRebalanceInfo) iterator.next()).getRegionPath())) {
+ final TabularResultData resultData =
+ rebalanceResultData.addSection().addTable("Table" + index);
+ String newLine = System.getProperty("line.separator");
+ StringBuilder resultStr = new StringBuilder();
+ resultStr.append(newLine);
+
+ resultData.accumulate("Rebalanced Stats", CliStrings.REBALANCE__MSG__TOTALBUCKETCREATEBYTES);
+ resultData.accumulate("Value", results.getTotalBucketCreateBytes());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALBUCKETCREATEBYTES).append(" = ")
+ .append(results.getTotalBucketCreateBytes()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats", CliStrings.REBALANCE__MSG__TOTALBUCKETCREATETIM);
+ resultData.accumulate("Value", results.getTotalBucketCreateTime());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALBUCKETCREATETIM).append(" = ")
+ .append(results.getTotalBucketCreateTime()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats",
+ CliStrings.REBALANCE__MSG__TOTALBUCKETCREATESCOMPLETED);
+ resultData.accumulate("Value", results.getTotalBucketCreatesCompleted());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALBUCKETCREATESCOMPLETED).append(" = ")
+ .append(results.getTotalBucketCreatesCompleted()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats",
+ CliStrings.REBALANCE__MSG__TOTALBUCKETTRANSFERBYTES);
+ resultData.accumulate("Value", results.getTotalBucketTransferBytes());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALBUCKETTRANSFERBYTES).append(" = ")
+ .append(results.getTotalBucketTransferBytes()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats", CliStrings.REBALANCE__MSG__TOTALBUCKETTRANSFERTIME);
+ resultData.accumulate("Value", results.getTotalBucketTransferTime());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALBUCKETTRANSFERTIME).append(" = ")
+ .append(results.getTotalBucketTransferTime()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats",
+ CliStrings.REBALANCE__MSG__TOTALBUCKETTRANSFERSCOMPLETED);
+ resultData.accumulate("Value", results.getTotalBucketTransfersCompleted());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALBUCKETTRANSFERSCOMPLETED).append(" = ")
+ .append(results.getTotalBucketTransfersCompleted()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats",
+ CliStrings.REBALANCE__MSG__TOTALPRIMARYTRANSFERTIME);
+ resultData.accumulate("Value", results.getTotalPrimaryTransferTime());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALPRIMARYTRANSFERTIME).append(" = ")
+ .append(results.getTotalPrimaryTransferTime()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats",
+ CliStrings.REBALANCE__MSG__TOTALPRIMARYTRANSFERSCOMPLETED);
+ resultData.accumulate("Value", results.getTotalPrimaryTransfersCompleted());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALPRIMARYTRANSFERSCOMPLETED).append(" = ")
+ .append(results.getTotalPrimaryTransfersCompleted()).append(newLine);
+
+ resultData.accumulate("Rebalanced Stats", CliStrings.REBALANCE__MSG__TOTALTIME);
+ resultData.accumulate("Value", results.getTotalTime());
+ resultStr.append(CliStrings.REBALANCE__MSG__TOTALTIME).append(" = ")
+ .append(results.getTotalTime()).append(newLine);
+
+ Iterator<PartitionRebalanceInfo> it = regions.iterator();
+
+ String headerText;
+
+ if (simulate) {
+ headerText = "Simulated partition regions ";
+ } else {
+ headerText = "Rebalanced partition regions ";
+ }
+
+ while (it.hasNext()) {
+ PartitionRebalanceInfo rgn = it.next();
+ headerText = headerText + " " + rgn.getRegionPath();
+ }
+ resultData.setHeader(resultData.getHeader() + headerText);
+
+ cache.getLogger().info(headerText + resultStr);
+ }
+ return rebalanceResultData;
+ }
+
+ private Result executeRebalanceOnDS(InternalCache cache, String simulate,
+ String[] excludeRegionsList) {
+ Result result = null;
+ int index = 1;
+ CompositeResultData rebalanceResultData = ResultBuilder.createCompositeResultData();
+ List<String> listExcludedRegion = new ArrayList<>();
+ if (excludeRegionsList != null) {
+ Collections.addAll(listExcludedRegion, excludeRegionsList);
+ }
+ List<MemberPRInfo> listMemberRegion = getMemberRegionList(cache, listExcludedRegion);
+
+ if (listMemberRegion.size() == 0) {
+ return ResultBuilder
+ .createInfoResult(CliStrings.REBALANCE__MSG__NO_REBALANCING_REGIONS_ON_DS);
+ }
+
+ Iterator<MemberPRInfo> iterator = listMemberRegion.iterator();
+ boolean flagToContinueWithRebalance = false;
+
+ // check if list has some members that can be rebalanced
+ while (iterator.hasNext()) {
+ if (iterator.next().dsMemberList.size() > 1) {
+ flagToContinueWithRebalance = true;
+ break;
+ }
+ }
+
+ if (!flagToContinueWithRebalance) {
+ return ResultBuilder
+ .createInfoResult(CliStrings.REBALANCE__MSG__NO_REBALANCING_REGIONS_ON_DS);
+ }
+
+ for (MemberPRInfo memberPR : listMemberRegion) {
+ try {
+ // check if there are more than one members associated with region for rebalancing
+ if (memberPR.dsMemberList.size() > 1) {
+ for (int i = 0; i < memberPR.dsMemberList.size(); i++) {
+ DistributedMember dsMember = memberPR.dsMemberList.get(i);
+ Function rebalanceFunction = new RebalanceFunction();
+ Object[] functionArgs = new Object[3];
+ functionArgs[0] = simulate;
+ Set<String> regionSet = new HashSet<>();
+
+ regionSet.add(memberPR.region);
+ functionArgs[1] = regionSet;
+
+ Set<String> excludeRegionSet = new HashSet<>();
+ functionArgs[2] = excludeRegionSet;
+
+ List resultList = null;
+
+ try {
+ if (checkMemberPresence(dsMember, cache)) {
+ resultList = (ArrayList) CliUtil
+ .executeFunction(rebalanceFunction, functionArgs, dsMember).getResult();
+
+ if (checkResultList(rebalanceResultData, resultList, dsMember)) {
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ continue;
+ }
+
+ List<String> rstList = tokenize((String) resultList.get(0), ",");
+ result = ResultBuilder.buildResult(toCompositeResultData(rebalanceResultData,
+ (ArrayList) rstList, index, simulate.equals("true"), cache));
+ index++;
+
+ // Rebalancing for region is done so break and continue with other region
+ break;
+ } else {
+ if (i == memberPR.dsMemberList.size() - 1) {
+ rebalanceResultData.addSection().addData(
+ CliStrings.format(
+ CliStrings.REBALANCE__MSG__NO_EXECUTION_FOR_REGION_0_ON_MEMBERS_1,
+ memberPR.region, listOfAllMembers(memberPR.dsMemberList)),
+ CliStrings.REBALANCE__MSG__MEMBERS_MIGHT_BE_DEPARTED);
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ } else {
+ continue;
+ }
+ }
+ } catch (Exception ex) {
+ if (i == memberPR.dsMemberList.size() - 1) {
+ rebalanceResultData.addSection().addData(
+ CliStrings.format(
+ CliStrings.REBALANCE__MSG__NO_EXECUTION_FOR_REGION_0_ON_MEMBERS_1,
+ memberPR.region, listOfAllMembers(memberPR.dsMemberList)),
+ CliStrings.REBALANCE__MSG__REASON + ex.getMessage());
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ } else {
+ continue;
+ }
+ }
+
+ if (checkResultList(rebalanceResultData, resultList, dsMember)) {
+ result = ResultBuilder.buildResult(rebalanceResultData);
+ continue;
+ }
+
+ List<String> rstList = tokenize((String) resultList.get(0), ",");
+ result = ResultBuilder.buildResult(toCompositeResultData(rebalanceResultData,
+ (ArrayList) rstList, index, simulate.equals("true"), cache));
+ index++;
+ }
+ }
+ } catch (Exception e) {
+ ErrorResultData errorResultData = ResultBuilder.createErrorResultData()
+ .setErrorCode(ResultBuilder.ERRORCODE_DEFAULT).addLine(e.getMessage());
+ return (ResultBuilder.buildResult(errorResultData));
+ }
+ }
+ return result;
+ }
+
+ private static class MemberPRInfo {
+ ArrayList<DistributedMember> dsMemberList;
+ public String region;
+
+ MemberPRInfo() {
+ region = "";
+ dsMemberList = new ArrayList<>();
+ }
+
+ @Override
+ public boolean equals(Object o2) {
+ return o2 != null && this.region.equals(((MemberPRInfo) o2).region);
+ }
+ }
+
+ private List<MemberPRInfo> getMemberRegionList(InternalCache cache,
+ List<String> listExcludedRegion) {
+ List<MemberPRInfo> listMemberPRInfo = new ArrayList<>();
+ String[] listDSRegions =
+ ManagementService.getManagementService(cache).getDistributedSystemMXBean().listRegions();
+ final Set<DistributedMember> dsMembers = CliUtil.getAllMembers(cache);
+
+ for (String regionName : listDSRegions) {
+ // check for excluded regions
+ boolean excludedRegionMatch = false;
+ for (String aListExcludedRegion : listExcludedRegion) {
+ // this is needed since region name may start with / or without it
+ // also
+ String excludedRegion = aListExcludedRegion.trim();
+ if (regionName.startsWith("/")) {
+ if (!excludedRegion.startsWith("/")) {
+ excludedRegion = "/" + excludedRegion;
+ }
+ }
+ if (excludedRegion.startsWith("/")) {
+ if (!regionName.startsWith("/")) {
+ regionName = "/" + regionName;
+ }
+ }
+
+ if (excludedRegion.equals(regionName)) {
+ excludedRegionMatch = true;
+ break;
+ }
+ }
+
+ if (excludedRegionMatch) {
+ // ignore this region
+ continue;
+ }
+
+ if (!regionName.startsWith("/")) {
+ regionName = Region.SEPARATOR + regionName;
+ }
+ // remove this prefix /
+ DistributedRegionMXBean bean =
+ ManagementService.getManagementService(cache).getDistributedRegionMXBean(regionName);
+
+ if (bean != null) {
+ if (bean.getRegionType().equals(DataPolicy.PARTITION.toString())
+ || bean.getRegionType().equals(DataPolicy.PERSISTENT_PARTITION.toString())) {
+
+ String[] memberNames = bean.getMembers();
+ for (DistributedMember dsmember : dsMembers) {
+ for (String memberName : memberNames) {
+ if (MBeanJMXAdapter.getMemberNameOrId(dsmember).equals(memberName)) {
+ MemberPRInfo memberAndItsPRRegions = new MemberPRInfo();
+ memberAndItsPRRegions.region = regionName;
+ memberAndItsPRRegions.dsMemberList.add(dsmember);
+ if (listMemberPRInfo.contains(memberAndItsPRRegions)) {
+ // add member for appropriate region
+ int index = listMemberPRInfo.indexOf(memberAndItsPRRegions);
+ MemberPRInfo listMember = listMemberPRInfo.get(index);
+ listMember.dsMemberList.add(dsmember);
+ } else {
+ listMemberPRInfo.add(memberAndItsPRRegions);
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return listMemberPRInfo;
+ }
+
+ private boolean checkMemberPresence(DistributedMember dsMember, InternalCache cache) {
+ // check if member's presence just before executing function
+ // this is to avoid running a function on departed members #47248
+ Set<DistributedMember> dsMemberList = CliUtil.getAllNormalMembers(cache);
+ return dsMemberList.contains(dsMember);
+ }
+
+ private String listOfAllMembers(ArrayList<DistributedMember> dsMemberList) {
+ StringBuilder listMembersId = new StringBuilder();
+ for (int j = 0; j < dsMemberList.size() - 1; j++) {
+ listMembersId.append(dsMemberList.get(j).getId());
+ listMembersId.append(" ; ");
+ }
+ return listMembersId.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RemoveCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RemoveCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RemoveCommand.java
new file mode 100644
index 0000000..12be62e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/RemoveCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.callFunctionForRegion;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.getRegionAssociatedMembers;
+import static org.apache.geode.management.internal.cli.commands.DataCommandsUtils.makePresentationResult;
+
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.Result;
+import org.apache.geode.management.internal.cli.domain.DataCommandRequest;
+import org.apache.geode.management.internal.cli.domain.DataCommandResult;
+import org.apache.geode.management.internal.cli.functions.DataCommandFunction;
+import org.apache.geode.management.internal.cli.i18n.CliStrings;
+
+public class RemoveCommand implements GfshCommand {
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, CliStrings.TOPIC_GEODE_REGION})
+ @CliCommand(value = {CliStrings.REMOVE}, help = CliStrings.REMOVE__HELP)
+ public Result remove(
+ @CliOption(key = {CliStrings.REMOVE__KEY}, help = CliStrings.REMOVE__KEY__HELP,
+ specifiedDefaultValue = "") String key,
+ @CliOption(key = {CliStrings.REMOVE__REGION}, mandatory = true,
+ help = CliStrings.REMOVE__REGION__HELP,
+ optionContext = ConverterHint.REGION_PATH) String regionPath,
+ @CliOption(key = CliStrings.REMOVE__ALL, help = CliStrings.REMOVE__ALL__HELP,
+ specifiedDefaultValue = "true", unspecifiedDefaultValue = "false") boolean removeAllKeys,
+ @CliOption(key = {CliStrings.REMOVE__KEYCLASS},
+ help = CliStrings.REMOVE__KEYCLASS__HELP) String keyClass) {
+ InternalCache cache = getCache();
+ DataCommandResult dataResult;
+
+ if (StringUtils.isEmpty(regionPath)) {
+ return makePresentationResult(DataCommandResult.createRemoveResult(key, null, null,
+ CliStrings.REMOVE__MSG__REGIONNAME_EMPTY, false));
+ }
+
+ if (!removeAllKeys && (key == null)) {
+ return makePresentationResult(DataCommandResult.createRemoveResult(null, null, null,
+ CliStrings.REMOVE__MSG__KEY_EMPTY, false));
+ }
+
+ if (removeAllKeys) {
+ cache.getSecurityService().authorizeRegionWrite(regionPath);
+ } else {
+ cache.getSecurityService().authorizeRegionWrite(regionPath, key);
+ }
+
+ @SuppressWarnings("rawtypes")
+ Region region = cache.getRegion(regionPath);
+ DataCommandFunction removefn = new DataCommandFunction();
+ if (region == null) {
+ Set<DistributedMember> memberList = getRegionAssociatedMembers(regionPath, getCache(), false);
+ if (CollectionUtils.isNotEmpty(memberList)) {
+ DataCommandRequest request = new DataCommandRequest();
+ request.setCommand(CliStrings.REMOVE);
+ request.setKey(key);
+ request.setKeyClass(keyClass);
+ request.setRemoveAllKeys(removeAllKeys ? "ALL" : null);
+ request.setRegionName(regionPath);
+ dataResult = callFunctionForRegion(request, removefn, memberList);
+ } else {
+ dataResult = DataCommandResult.createRemoveInfoResult(key, null, null,
+ CliStrings.format(CliStrings.REMOVE__MSG__REGION_NOT_FOUND_ON_ALL_MEMBERS, regionPath),
+ false);
+ }
+
+ } else {
+ dataResult = removefn.remove(key, keyClass, regionPath, removeAllKeys ? "ALL" : null);
+ }
+ dataResult.setKeyClass(keyClass);
+
+ return makePresentationResult(dataResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/DataCommandsController.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/DataCommandsController.java b/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/DataCommandsController.java
index ff100b5..ce7ebfc 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/DataCommandsController.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/web/controllers/DataCommandsController.java
@@ -34,7 +34,13 @@ import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
* the Gfsh Data Commands.
* <p/>
*
- * @see org.apache.geode.management.internal.cli.commands.DataCommands
+ * @see org.apache.geode.management.internal.cli.commands.ExportDataCommand
+ * @see org.apache.geode.management.internal.cli.commands.GetCommand
+ * @see org.apache.geode.management.internal.cli.commands.ImportDataCommand
+ * @see org.apache.geode.management.internal.cli.commands.LocateEntryCommand
+ * @see org.apache.geode.management.internal.cli.commands.PutCommand
+ * @see org.apache.geode.management.internal.cli.commands.RebalanceCommand
+ * @see org.apache.geode.management.internal.cli.commands.RemoveCommand
* @see org.apache.geode.management.internal.web.controllers.AbstractCommandsController
* @see org.springframework.stereotype.Controller
* @see org.springframework.web.bind.annotation.PathVariable
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/test/java/org/apache/geode/management/internal/cli/CommandRequestTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CommandRequestTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CommandRequestTest.java
index 5e60f0b..0d6e7cc 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CommandRequestTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CommandRequestTest.java
@@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.geode.management.internal.cli.commands.DataCommands;
+import org.apache.geode.management.internal.cli.commands.RebalanceCommand;
import org.apache.geode.test.junit.categories.UnitTest;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +42,7 @@ public class CommandRequestTest {
this.mockParseResult = mock(GfshParseResult.class);
when(this.mockParseResult.getUserInput()).thenReturn("rebalance --simulate=true --time-out=-1");
when(this.mockParseResult.getParamValueStrings()).thenReturn(this.paramValues);
- when(this.mockParseResult.getMethod()).thenReturn(DataCommands.class.getMethod("rebalance",
+ when(this.mockParseResult.getMethod()).thenReturn(RebalanceCommand.class.getMethod("rebalance",
String[].class, String[].class, long.class, boolean.class));
this.mockEnvironment = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GemfireDataCommandsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GemfireDataCommandsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GemfireDataCommandsDUnitTest.java
index 96aa526..9da8072 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GemfireDataCommandsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GemfireDataCommandsDUnitTest.java
@@ -12,6 +12,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
+
package org.apache.geode.management.internal.cli.commands;
import static org.apache.geode.distributed.ConfigurationProperties.*;
@@ -412,7 +413,7 @@ public class GemfireDataCommandsDUnitTest extends CliCommandTestBase {
getLogWriter().info("Region in query : " + regionsInQuery);
if (regionsInQuery.size() > 0) {
Set<DistributedMember> members =
- DataCommands.getQueryRegionsAssociatedMembers(regionsInQuery, cache, returnAll);
+ DataCommandsUtils.getQueryRegionsAssociatedMembers(regionsInQuery, cache, returnAll);
getLogWriter().info("Members for Region in query : " + members);
if (expectedMembers != -1) {
assertNotNull(members);
http://git-wip-us.apache.org/repos/asf/geode/blob/32364169/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GetCommandOnRegionWithCacheLoaderDuringCacheMissDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GetCommandOnRegionWithCacheLoaderDuringCacheMissDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GetCommandOnRegionWithCacheLoaderDuringCacheMissDUnitTest.java
index e430215..cebb7f1 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GetCommandOnRegionWithCacheLoaderDuringCacheMissDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/GetCommandOnRegionWithCacheLoaderDuringCacheMissDUnitTest.java
@@ -50,7 +50,7 @@ import static org.apache.geode.test.dunit.Wait.waitForCriterion;
* CacheLoader defined.
*
* @see org.apache.geode.management.internal.cli.commands.CliCommandTestBase
- * @see org.apache.geode.management.internal.cli.commands.DataCommands
+ * @see org.apache.geode.management.internal.cli.commands.GetCommand
* @since GemFire 8.0
*/
@SuppressWarnings("unused")