You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by yi...@apache.org on 2023/02/12 13:56:15 UTC
[ozone] 03/04: HDDS-7205. DiskBalancer CLI (#3739)
This is an automated email from the ASF dual-hosted git repository.
yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 28cfeab74b184830e77add701b2f4ab3ae81b86b
Author: Symious <yi...@foxmail.com>
AuthorDate: Thu Sep 15 10:02:49 2022 +0800
HDDS-7205. DiskBalancer CLI (#3739)
---
.../hdds/scm/server/SCMClientProtocolServer.java | 6 +-
.../hdds/scm/cli/datanode/DatanodeCommands.java | 3 +-
.../scm/cli/datanode/DiskBalancerCommands.java | 99 ++++++++
.../cli/datanode/DiskBalancerReportSubcommand.java | 70 ++++++
.../cli/datanode/DiskBalancerStartSubcommand.java | 100 ++++++++
.../cli/datanode/DiskBalancerStatusSubcommand.java | 103 ++++++++
.../cli/datanode/DiskBalancerStopSubcommand.java | 82 +++++++
.../cli/datanode/DiskBalancerUpdateSubcommand.java | 98 ++++++++
.../cli/datanode/TestDiskBalancerSubCommand.java | 259 +++++++++++++++++++++
9 files changed, 816 insertions(+), 4 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index d3f23089be..a20edaa05a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1162,14 +1162,14 @@ public class SCMClientProtocolServer implements
Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
Optional<List<String>> hosts) throws IOException {
// TODO: Send message to datanodes
- return null;
+ return new ArrayList<DatanodeAdminError>();
}
@Override
public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
throws IOException {
// TODO: Send message to datanodes
- return null;
+ return new ArrayList<DatanodeAdminError>();
}
@@ -1179,7 +1179,7 @@ public class SCMClientProtocolServer implements
Optional<Integer> parallelThread, Optional<List<String>> hosts)
throws IOException {
// TODO: Send message to datanodes
- return null;
+ return new ArrayList<DatanodeAdminError>();
}
/**
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DatanodeCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DatanodeCommands.java
index 8cb2114f57..4c7f71037f 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DatanodeCommands.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DatanodeCommands.java
@@ -41,7 +41,8 @@ import java.util.concurrent.Callable;
DecommissionSubCommand.class,
MaintenanceSubCommand.class,
RecommissionSubCommand.class,
- UsageInfoSubcommand.class
+ UsageInfoSubcommand.class,
+ DiskBalancerCommands.class
})
@MetaInfServices(SubcommandWithParent.class)
public class DatanodeCommands implements Callable<Void>, SubcommandWithParent {
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java
new file mode 100644
index 0000000000..649ee59eaa
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerCommands.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Model.CommandSpec;
+import picocli.CommandLine.Spec;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Subcommand to group disk balancer related operations.
+ *
+ * <p>The balancer is a tool that balances space usage on an Ozone datanode
+ * when some disks become full or when new empty disks were added to a datanode.
+ *
+ * <p>SYNOPSIS
+ * <pre>
+ * To start:
+ * ozone admin datanode diskbalancer start
+ * [ -t/--threshold {@literal <threshold>}]
+ * [ -b/--bandwidthInMB {@literal <bandwidthInMB>}]
+ * [ -p/--parallelThread {@literal <parallelThread>}]
+ * [ -a/--alldatanodes {@literal <alldatanodes>}]
+ * [ {@literal <hosts>}]
+ * Examples:
+ * ozone admin datanode diskbalancer start {@literal <hosts>}
+ * start balancer with default values in the configuration on specified
+ * datanodes
+ * ozone admin datanode diskbalancer start -a
+ * start balancer with default values in the configuration on all
+ * datanodes in the cluster
+ * ozone admin datanode diskbalancer start -t 5 {@literal <hosts>}
+ * start balancer with a threshold of 5%
+ * ozone admin datanode diskbalancer start -b 20 {@literal <hosts>}
+ * start balancer with maximum 20MB/s diskbandwidth
+ * ozone admin datanode diskbalancer start -p 5 {@literal <hosts>}
+ * start balancer with 5 parallel thread on each datanode
+ * To stop:
+ * ozone admin datanode diskbalancer stop -a
+ * stop diskblancer on all datanodes
+ * ozone admin datanode diskbalancer stop {@literal <hosts>};
+ * stop diskblancer on all datanodes
+ * To update:
+ * ozone admin datanode diskbalancer update -a
+ * update diskblancer configuration on all datanodes
+ * ozone admin datanode diskbalancer update {@literal <hosts>};
+ * update diskblancer configuration on all datanodes
+ * To get report:
+ * ozone admin datanode diskbalancer report -c 10
+ * retrieve at most 10 datanodes that needs diskbalance most
+ * To get status:
+ * ozone admin datanode diskbalancer status -s RUNNING {@literal <hosts>}
+ * return the diskbalancer status on datanodes where diskbalancer are in
+ * Running state
+ *
+ * </pre>
+ */
+
+@Command(
+ name = "diskbalancer",
+ description = "DiskBalancer specific operations",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class,
+ subcommands = {
+ DiskBalancerStartSubcommand.class,
+ DiskBalancerStopSubcommand.class,
+ DiskBalancerUpdateSubcommand.class,
+ DiskBalancerReportSubcommand.class,
+ DiskBalancerStatusSubcommand.class
+ })
+public class DiskBalancerCommands implements Callable<Void> {
+
+ @Spec
+ private CommandSpec spec;
+
+ @Override
+ public Void call() throws Exception {
+ GenericCli.missingSubcommand(spec);
+ return null;
+ }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerReportSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerReportSubcommand.java
new file mode 100644
index 0000000000..deb7b07043
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerReportSubcommand.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Handler to get Datanode Volume Density report.
+ */
+@Command(
+ name = "report",
+ description = "Get Datanode Volume Density Report",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class DiskBalancerReportSubcommand extends ScmSubcommand {
+ @Option(names = {"-c", "--count"},
+ description = "Result count to return. Sort by Volume Density " +
+ "in descending order.")
+ private int count;
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> resultProto =
+ scmClient.getDiskBalancerReport(count);
+ System.out.println(generateReport(resultProto));
+ }
+
+ private String generateReport(
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> protos) {
+ StringBuilder formatBuilder = new StringBuilder("Report result:%n" +
+ "%-50s %s%n");
+
+ List<String> contentList = new ArrayList<>();
+ contentList.add("Datanode");
+ contentList.add("VolumeDensity");
+
+ for (HddsProtos.DatanodeDiskBalancerInfoProto proto: protos) {
+ formatBuilder.append("%-50s %s%n");
+ contentList.add(proto.getNode().getHostName());
+ contentList.add(String.valueOf(proto.getCurrentVolumeDensitySum()));
+ }
+
+ return String.format(formatBuilder.toString(),
+ contentList.toArray(new String[0]));
+ }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
new file mode 100644
index 0000000000..e3862ba23f
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStartSubcommand.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Parameters;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Handler to start disk balancer.
+ */
+@Command(
+ name = "start",
+ description = "Start DiskBalancer",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class DiskBalancerStartSubcommand extends ScmSubcommand {
+
+ @Option(names = {"-t", "--threshold"},
+ description = "Percentage deviation from average utilization of " +
+ "the disks after which a datanode will be rebalanced (for " +
+ "example, '10' for 10%%).")
+ private Optional<Double> threshold;
+
+ @Option(names = {"-b", "--bandwidthInMB"},
+ description = "Maximum bandwidth for DiskBalancer per second.")
+ private Optional<Long> bandwidthInMB;
+
+ @Option(names = {"-p", "--parallelThread"},
+ description = "Max parallelThread for DiskBalancer.")
+ private Optional<Integer> parallelThread;
+
+ @Option(names = {"-a", "--allDatanodes"},
+ description = "Start diskBalancer on all datanodes.")
+ private boolean allHosts;
+
+ @Parameters(description = "List of fully qualified host names")
+ private List<String> hosts = new ArrayList<>();
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ if (hosts.size() == 0 && !allHosts) {
+ System.out.println("Datanode not specified. Please specify " +
+ "\"--allDatanodes\" to start diskBalancer on all datanodes");
+ return;
+ }
+ if (hosts.size() != 0 && allHosts) {
+ System.out.println("Confused options. Omit \"--allDatanodes\" or " +
+ "Datanodes.");
+ return;
+ }
+ List<DatanodeAdminError> errors =
+ scmClient.startDiskBalancer(threshold, bandwidthInMB, parallelThread,
+ hosts.size() == 0 ? Optional.empty() : Optional.of(hosts));
+
+ System.out.println("Start DiskBalancer on datanode(s):\n" +
+ (allHosts ? "All datanodes" : String.join("\n", hosts)));
+
+ if (errors.size() > 0) {
+ for (DatanodeAdminError error : errors) {
+ System.err.println("Error: " + error.getHostname() + ": "
+ + error.getError());
+ }
+ // Throwing the exception will cause a non-zero exit status for the
+ // command.
+ throw new IOException(
+ "Some nodes could not start DiskBalancer.");
+ }
+ }
+
+ @VisibleForTesting
+ public void setAllHosts(boolean allHosts) {
+ this.allHosts = allHosts;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
new file mode 100644
index 0000000000..6a61876479
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Parameters;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Handler to get disk balancer status.
+ */
+@Command(
+ name = "status",
+ description = "Get Datanode DiskBalancer Status",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class DiskBalancerStatusSubcommand extends ScmSubcommand {
+
+ private final Set<String> stateSet =
+ new HashSet<>(Arrays.asList("RUNNING", "STOPPED", "UNKNOWN"));
+
+ @Option(names = {"-s", "--state"},
+ description = "RUNNING, STOPPED, UNKNOWN")
+ private String state = "RUNNING";
+
+ @Parameters(description = "List of fully qualified host names")
+ private List<String> hosts = new ArrayList<>();
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ if (state != null && !stateSet.contains(state.toUpperCase(Locale.ROOT))) {
+ System.err.println("Unsupported state: " + state);
+ }
+
+ assert state != null;
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> resultProto =
+ scmClient.getDiskBalancerStatus(
+ hosts.size() == 0 ? Optional.empty() : Optional.of(hosts),
+ Optional.of(HddsProtos.DiskBalancerRunningStatus.valueOf(
+ state.toUpperCase(Locale.ROOT)))
+ );
+
+ System.out.println(generateStatus(resultProto));
+ }
+
+ private String generateStatus(
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> protos) {
+ StringBuilder formatBuilder = new StringBuilder("Status result:%n" +
+ "%-50s %s %s %s %s %s%n");
+
+ List<String> contentList = new ArrayList<>();
+ contentList.add("Datanode");
+ contentList.add("VolumeDensity");
+ contentList.add("Status");
+ contentList.add("Threshold");
+ contentList.add("BandwidthInMB");
+ contentList.add("ParallelThread");
+
+ for (HddsProtos.DatanodeDiskBalancerInfoProto proto: protos) {
+ formatBuilder.append("%-50s %s %s %s %s %s%n");
+ contentList.add(proto.getNode().getHostName());
+ contentList.add(String.valueOf(proto.getCurrentVolumeDensitySum()));
+ contentList.add(proto.getRunningStatus().name());
+ contentList.add(
+ String.valueOf(proto.getDiskBalancerConf().getThreshold()));
+ contentList.add(
+ String.valueOf(proto.getDiskBalancerConf().getDiskBandwidthInMB()));
+ contentList.add(
+ String.valueOf(proto.getDiskBalancerConf().getParallelThread()));
+ }
+
+ return String.format(formatBuilder.toString(),
+ contentList.toArray(new String[0]));
+ }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStopSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStopSubcommand.java
new file mode 100644
index 0000000000..9479c030a6
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStopSubcommand.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Parameters;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Handler to stop disk balancer.
+ */
+@Command(
+ name = "stop",
+ description = "Stop DiskBalancer",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class DiskBalancerStopSubcommand extends ScmSubcommand {
+ @Option(names = {"-a", "--allDatanodes"},
+ description = "Stop diskBalancer on all datanodes.")
+ private boolean allHosts;
+
+ @Parameters(description = "List of fully qualified host names")
+ private List<String> hosts = new ArrayList<>();
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ if (hosts.size() == 0 && !allHosts) {
+ System.out.println("Datanode not specified.");
+ return;
+ }
+ if (hosts.size() != 0 && allHosts) {
+ System.out.println("Confused options. Omit \"--allDatanodes\" or " +
+ "Datanodes.");
+ return;
+ }
+ List<DatanodeAdminError> errors = scmClient.stopDiskBalancer(allHosts ?
+ Optional.empty() : Optional.of(hosts));
+
+ System.out.println("Stopping DiskBalancer on datanode(s):\n" +
+ (allHosts ? "All datanodes" : String.join("\n", hosts)));
+ if (errors.size() > 0) {
+ for (DatanodeAdminError error : errors) {
+ System.err.println("Error: " + error.getHostname() + ": "
+ + error.getError());
+ }
+ // Throwing the exception will cause a non-zero exit status for the
+ // command.
+ throw new IOException(
+ "Some nodes could not stop DiskBalancer.");
+ }
+ }
+
+ @VisibleForTesting
+ public void setAllHosts(boolean allHosts) {
+ this.allHosts = allHosts;
+ }
+}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
new file mode 100644
index 0000000000..b19fb6a53a
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerUpdateSubcommand.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+import picocli.CommandLine.Parameters;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Handler to update disk balancer configuration.
+ */
+@Command(
+ name = "update",
+ description = "Update DiskBalancer Configuration",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class DiskBalancerUpdateSubcommand extends ScmSubcommand {
+
+ @Option(names = {"-t", "--threshold"},
+ description = "Percentage deviation from average utilization of " +
+ "the disks after which a datanode will be rebalanced (for " +
+ "example, '10' for 10%%).")
+ private Optional<Double> threshold;
+
+ @Option(names = {"-b", "--bandwidthInMB"},
+ description = "Maximum bandwidth for DiskBalancer per second.")
+ private Optional<Long> bandwidthInMB;
+
+ @Option(names = {"-p", "--parallelThread"},
+ description = "Max parallelThread for DiskBalancer.")
+ private Optional<Integer> parallelThread;
+
+ @Option(names = {"-a", "--allDatanodes"},
+ description = "Start diskBalancer on all datanodes.")
+ private boolean allHosts;
+
+ @Parameters(description = "List of fully qualified host names")
+ private List<String> hosts = new ArrayList<>();
+
+ @Override
+ public void execute(ScmClient scmClient) throws IOException {
+ if (hosts.size() == 0 && !allHosts) {
+ System.out.println("Datanode not specified.");
+ return;
+ }
+ if (hosts.size() != 0 && allHosts) {
+ System.out.println("Confused options. Omit \"--allDatanodes\" or " +
+ "Datanodes.");
+ return;
+ }
+ List<DatanodeAdminError> errors =
+ scmClient.updateDiskBalancerConfiguration(threshold, bandwidthInMB,
+ parallelThread,
+ hosts.size() == 0 ? Optional.empty() : Optional.of(hosts));
+
+ System.out.println("Update DiskBalancer Configuration on datanode(s):\n" +
+ (allHosts ? "All datanodes" : String.join("\n", hosts)));
+
+ if (errors.size() > 0) {
+ for (DatanodeAdminError error : errors) {
+ System.err.println("Error: " + error.getHostname() + ": "
+ + error.getError());
+ }
+ throw new IOException(
+ "Some nodes could not update DiskBalancer.");
+ }
+ }
+
+ @VisibleForTesting
+ public void setAllHosts(boolean allHosts) {
+ this.allHosts = allHosts;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
new file mode 100644
index 0000000000..702e6ee7c5
--- /dev/null
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.datanode;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests to validate the DiskBalancerSubCommand class includes the
+ * correct output when executed against a mock client.
+ */
+public class TestDiskBalancerSubCommand {
+
+ private DiskBalancerStopSubcommand stopCmd;
+ private DiskBalancerStartSubcommand startCmd;
+ private DiskBalancerUpdateSubcommand updateCmd;
+ private DiskBalancerReportSubcommand reportCmd;
+ private DiskBalancerStatusSubcommand statusCmd;
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+ private final PrintStream originalOut = System.out;
+ private final PrintStream originalErr = System.err;
+ private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
+
+ private Random random = new Random();
+
+ @BeforeEach
+ public void setup() throws UnsupportedEncodingException {
+ stopCmd = new DiskBalancerStopSubcommand();
+ startCmd = new DiskBalancerStartSubcommand();
+ updateCmd = new DiskBalancerUpdateSubcommand();
+ reportCmd = new DiskBalancerReportSubcommand();
+ statusCmd = new DiskBalancerStatusSubcommand();
+ System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
+ System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
+ }
+
+ @AfterEach
+ public void tearDown() {
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ }
+
+ @Test
+ public void testDiskBalancerReportSubcommand()
+ throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+
+ //test report
+ Mockito.when(scmClient.getDiskBalancerReport(Mockito.any(Integer.class)))
+ .thenReturn(generateReport(10));
+
+ reportCmd.execute(scmClient);
+
+ // 2 Headers + 10 results
+ assertEquals(12, newLineCount(outContent.toString(DEFAULT_ENCODING)));
+ }
+
+ @Test
+ public void testDiskBalancerStatusSubcommand()
+ throws IOException {
+ ScmClient scmClient = mock(ScmClient.class);
+
+ //test status
+ Mockito.when(scmClient.getDiskBalancerStatus(Mockito.any(), Mockito.any()))
+ .thenReturn(generateStatus(10));
+
+ statusCmd.execute(scmClient);
+
+ // 2 Headers + 10 results
+ assertEquals(12, newLineCount(outContent.toString(DEFAULT_ENCODING)));
+ }
+
+ @Test
+ public void testDiskBalancerStartSubcommand() throws IOException {
+ startCmd.setAllHosts(true);
+ ScmClient scmClient = mock(ScmClient.class);
+
+ // Return error
+ Mockito.when(scmClient.startDiskBalancer(Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any()))
+ .thenReturn(generateError(10));
+
+ try {
+ startCmd.execute(scmClient);
+ } catch (IOException e) {
+ assertEquals("Some nodes could not start DiskBalancer.", e.getMessage());
+ }
+
+ // Do not return error
+ Mockito.when(scmClient.startDiskBalancer(Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any()))
+ .thenReturn(generateError(0));
+
+ try {
+ startCmd.execute(scmClient);
+ } catch (IOException e) {
+ fail("Should not catch exception here.");
+ }
+
+ startCmd.setAllHosts(false);
+ }
+
+ @Test
+ public void testDiskBalancerUpdateSubcommand() throws IOException {
+ updateCmd.setAllHosts(true);
+ ScmClient scmClient = mock(ScmClient.class);
+
+ // Return error
+ Mockito.when(scmClient.updateDiskBalancerConfiguration(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(generateError(10));
+
+ try {
+ updateCmd.execute(scmClient);
+ } catch (IOException e) {
+ assertEquals("Some nodes could not update DiskBalancer.", e.getMessage());
+ }
+
+ // Do not return error
+ Mockito.when(scmClient.updateDiskBalancerConfiguration(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any()))
+ .thenReturn(generateError(0));
+
+ try {
+ updateCmd.execute(scmClient);
+ } catch (IOException e) {
+ fail("Should not catch exception here.");
+ }
+
+ updateCmd.setAllHosts(false);
+ }
+
+ @Test
+ public void testDiskBalancerStopSubcommand() throws IOException {
+ stopCmd.setAllHosts(true);
+ ScmClient scmClient = mock(ScmClient.class);
+
+ // Return error
+ Mockito.when(scmClient.stopDiskBalancer(Mockito.any()))
+ .thenReturn(generateError(10));
+
+ try {
+ stopCmd.execute(scmClient);
+ } catch (IOException e) {
+ assertEquals("Some nodes could not stop DiskBalancer.", e.getMessage());
+ }
+
+ // Do not return error
+ Mockito.when(scmClient.stopDiskBalancer(Mockito.any()))
+ .thenReturn(generateError(0));
+
+ try {
+ stopCmd.execute(scmClient);
+ } catch (IOException e) {
+ fail("Should not catch exception here.");
+ }
+
+ stopCmd.setAllHosts(false);
+ }
+
+
+ private List<DatanodeAdminError> generateError(int count) {
+ List<DatanodeAdminError> result = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ result.add(new DatanodeAdminError(UUID.randomUUID().toString(),
+ "ERROR"));
+ }
+ return result;
+ }
+
+ private List<HddsProtos.DatanodeDiskBalancerInfoProto> generateReport(
+ int count) {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> result = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ result.add(generateReport());
+ }
+ return result;
+ }
+
+ private List<HddsProtos.DatanodeDiskBalancerInfoProto> generateStatus(
+ int count) {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> result = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ result.add(generateStatus());
+ }
+ return result;
+ }
+
+ private HddsProtos.DatanodeDiskBalancerInfoProto generateReport() {
+ return HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+ .setNode(generateDatanodeDetails())
+ .setCurrentVolumeDensitySum(random.nextDouble())
+ .build();
+ }
+
+ private HddsProtos.DatanodeDiskBalancerInfoProto generateStatus() {
+ return HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+ .setNode(generateDatanodeDetails())
+ .setCurrentVolumeDensitySum(random.nextDouble())
+ .setRunningStatus(HddsProtos.DiskBalancerRunningStatus.
+ valueOf(random.nextInt(2) + 1))
+ .setDiskBalancerConf(
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder().build())
+ .build();
+ }
+
+ private HddsProtos.DatanodeDetailsProto generateDatanodeDetails() {
+ return HddsProtos.DatanodeDetailsProto.newBuilder()
+ .setHostName(UUID.randomUUID().toString())
+ .setIpAddress("1.1.1.1")
+ .build();
+ }
+
+ private int newLineCount(String str) {
+ int res = 0;
+ String[] lines = str.split("\n");
+ for (String line : lines) {
+ if (line.length() != 0) {
+ res++;
+ }
+ }
+ return res;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org