You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by yi...@apache.org on 2023/02/12 13:56:14 UTC
[ozone] 02/04: HDDS-7155. [DiskBalancer] Create interface between SCM and DN (#3701)
This is an automated email from the ASF dual-hosted git repository.
yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit c4a4cc0990d0dc8ea64c2597a4de3449b51a9ab5
Author: Symious <yi...@foxmail.com>
AuthorDate: Wed Sep 7 12:19:57 2022 +0800
HDDS-7155. [DiskBalancer] Create interface between SCM and DN (#3701)
---
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 4 +
.../apache/hadoop/hdds/scm/client/ScmClient.java | 21 ++--
.../protocol/StorageContainerLocationProtocol.java | 17 ++--
.../scm/storage/DiskBalancerConfiguration.java | 39 +++----
.../common/src/main/resources/ozone-default.xml | 8 ++
.../common/report/DiskBalancerReportPublisher.java | 66 ++++++++++++
.../common/report/ReportPublisherFactory.java | 3 +
.../common/statemachine/DatanodeStateMachine.java | 2 +
.../common/statemachine/StateContext.java | 13 +++
.../commandhandler/DiskBalancerCommandHandler.java | 112 +++++++++++++++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 7 ++
.../ozone/container/ozoneimpl/OzoneContainer.java | 6 ++
.../protocol/commands/DiskBalancerCommand.java | 74 ++++++++++++++
.../common/report/TestReportPublisher.java | 42 ++++++++
.../common/report/TestReportPublisherFactory.java | 12 +++
...inerLocationProtocolClientSideTranslatorPB.java | 81 +++++++++++++--
.../src/main/proto/ScmAdminProtocol.proto | 24 ++++-
.../interface-client/src/main/proto/hdds.proto | 10 +-
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 17 ++++
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 9 ++
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 111 ++++++++++++++------
.../hdds/scm/node/DiskBalancerReportHandler.java | 65 ++++++++++++
.../hadoop/hdds/scm/node/DiskBalancerStatus.java | 2 -
.../apache/hadoop/hdds/scm/node/NodeManager.java | 8 ++
.../hadoop/hdds/scm/node/SCMNodeManager.java | 17 ++++
...inerLocationProtocolServerSideTranslatorPB.java | 52 +++++++++-
.../hdds/scm/server/SCMClientProtocolServer.java | 22 ++--
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 23 +++++
.../hdds/scm/server/StorageContainerManager.java | 6 +-
.../hadoop/hdds/scm/container/MockNodeManager.java | 51 +++++++++-
.../hdds/scm/container/SimpleMockNodeManager.java | 5 +
.../hdds/scm/node/TestDiskBalancerManager.java | 34 +++++++
.../testutils/ReplicationNodeManagerMock.java | 6 ++
.../hdds/scm/cli/ContainerOperationClient.java | 29 +++---
.../hadoop/ozone/scm/node/TestDiskBalancer.java | 4 +
35 files changed, 902 insertions(+), 100 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index cb258dfa74..7b3a9c0dd7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -46,6 +46,10 @@ public final class HddsConfigKeys {
"hdds.pipeline.report.interval";
public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
"60s";
+ public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL =
+ "hdds.disk.balancer.report.interval";
+ public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT =
+ "60s";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
"hdds.command.status.report.interval";
public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 76c07d1c1b..98e6e26e87 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -400,7 +400,8 @@ public interface ScmClient extends Closeable {
throws IOException;
/**
- * Get DiskBalancer status.
+ * Get DiskBalancer report.
+ * REPORT shows the current volume density of datanodes.
* @param count top datanodes that need balancing
* @return List of DatanodeDiskBalancerInfo.
* @throws IOException
@@ -410,33 +411,39 @@ public interface ScmClient extends Closeable {
/**
* Get DiskBalancer status.
+ * STATUS shows the running status of DiskBalancer on datanodes.
* @param hosts If hosts is not null, return status of hosts; If hosts is
* null, return status of all datanodes in balancing.
* @return List of DatanodeDiskBalancerInfo.
* @throws IOException
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts) throws IOException;
+ Optional<List<String>> hosts,
+ Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+ throws IOException;
/**
* Start DiskBalancer.
*/
- void startDiskBalancer(
+ List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold,
- Optional<Double> bandwidth,
+ Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread,
Optional<List<String>> hosts) throws IOException;
/**
* Stop DiskBalancer.
*/
- void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+ List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+ throws IOException;
/**
* Update DiskBalancer Configuration.
*/
- void updateDiskBalancerConfiguration(
+ List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold,
- Optional<Double> bandwidth,
+ Optional<Long> bandwidth,
+ Optional<Integer> parallelThread,
Optional<List<String>> hosts) throws IOException;
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 56ed8c7086..afdf36a5f5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -441,26 +441,31 @@ public interface StorageContainerLocationProtocol extends Closeable {
* Get DiskBalancer status.
*/
List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts, int clientVersion) throws IOException;
+ Optional<List<String>> hosts,
+ Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus,
+ int clientVersion) throws IOException;
/**
* Start DiskBalancer.
*/
- void startDiskBalancer(
+ List<DatanodeAdminError> startDiskBalancer(
Optional<Double> threshold,
- Optional<Double> bandwidth,
+ Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread,
Optional<List<String>> hosts) throws IOException;
/**
* Stop DiskBalancer.
*/
- void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+ List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+ throws IOException;
/**
* Update DiskBalancer Configuration.
*/
- void updateDiskBalancerConfiguration(
+ List<DatanodeAdminError> updateDiskBalancerConfiguration(
Optional<Double> threshold,
- Optional<Double> bandwidth,
+ Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread,
Optional<List<String>> hosts) throws IOException;
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index 704b383679..43417f2e81 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -36,7 +36,7 @@ public final class DiskBalancerConfiguration {
private static final Logger LOG =
LoggerFactory.getLogger(DiskBalancerConfiguration.class);
- @Config(key = "volume.density.threshold", type = ConfigType.AUTO,
+ @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE,
defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
description = "Threshold is a percentage in the range of 0 to 100. A " +
"datanode is considered balanced if for each volume, the " +
@@ -45,12 +45,12 @@ public final class DiskBalancerConfiguration {
" of the entire datanode) no more than the threshold.")
private double threshold = 10d;
- @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
+ @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG,
defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
description = "The max balance speed.")
- private double diskBandwidth = 10;
+ private long diskBandwidthInMB = 10;
- @Config(key = "parallel.thread", type = ConfigType.AUTO,
+ @Config(key = "parallel.thread", type = ConfigType.INT,
defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
description = "The max parallel balance thread count.")
private int parallelThread = 5;
@@ -86,21 +86,21 @@ public final class DiskBalancerConfiguration {
*
* @return max disk bandwidth per second
*/
- public double getDiskBandwidth() {
- return diskBandwidth;
+ public double getDiskBandwidthInMB() {
+ return diskBandwidthInMB;
}
/**
* Sets the disk bandwidth value for Disk Balancer.
*
- * @param diskBandwidth the bandwidth to control balance speed
+ * @param diskBandwidthInMB the bandwidth to control balance speed
*/
- public void setDiskBandwidth(double diskBandwidth) {
- if (diskBandwidth <= 0d) {
+ public void setDiskBandwidthInMB(long diskBandwidthInMB) {
+ if (diskBandwidthInMB <= 0L) {
throw new IllegalArgumentException(
- "diskBandwidth must be a value larger than 0.");
+ "diskBandwidthInMB must be a value larger than 0.");
}
- this.diskBandwidth = diskBandwidth;
+ this.diskBandwidthInMB = diskBandwidthInMB;
}
/**
@@ -124,6 +124,7 @@ public final class DiskBalancerConfiguration {
}
this.parallelThread = parallelThread;
}
+
@Override
public String toString() {
return String.format("Disk Balancer Configuration values:%n" +
@@ -132,7 +133,7 @@ public final class DiskBalancerConfiguration {
"%-50s %s%n" +
"%-50s %s%n",
"Key", "Value",
- "Threshold", threshold, "Max disk bandwidth", diskBandwidth,
+ "Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB,
"Parallel Thread", parallelThread);
}
@@ -141,21 +142,21 @@ public final class DiskBalancerConfiguration {
HddsProtos.DiskBalancerConfigurationProto.newBuilder();
builder.setThreshold(threshold)
- .setDiskBandwidth(diskBandwidth)
+ .setDiskBandwidthInMB(diskBandwidthInMB)
.setParallelThread(parallelThread);
return builder;
}
- static DiskBalancerConfiguration fromProtobuf(
+ public static DiskBalancerConfiguration fromProtobuf(
@NotNull HddsProtos.DiskBalancerConfigurationProto proto,
- @NotNull OzoneConfiguration ozoneConfiguration) {
+ @NotNull ConfigurationSource configurationSource) {
DiskBalancerConfiguration config =
- ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
+ configurationSource.getObject(DiskBalancerConfiguration.class);
if (proto.hasThreshold()) {
config.setThreshold(proto.getThreshold());
}
- if (proto.hasDiskBandwidth()) {
- config.setDiskBandwidth(proto.getDiskBandwidth());
+ if (proto.hasDiskBandwidthInMB()) {
+ config.setDiskBandwidthInMB(proto.getDiskBandwidthInMB());
}
if (proto.hasParallelThread()) {
config.setParallelThread(proto.getParallelThread());
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c6adf0c2db..8e3c78b35b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -365,6 +365,14 @@
datanode periodically send pipeline report to SCM. Unit could be
defined with postfix (ns,ms,s,m,h,d)</description>
</property>
+ <property>
+ <name>hdds.disk.balancer.report.interval</name>
+ <value>60000ms</value>
+ <tag>OZONE, CONTAINER, DISK_BALANCER</tag>
+ <description>Time interval of the datanode to send disk balancer report. Each
+ datanode periodically sends disk balancer report to SCM. Unit could be
+ defined with postfix (ns,ms,s,m,h,d)</description>
+ </property>
<property>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
new file mode 100644
index 0000000000..2bb78c1ee6
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT;
+
+
+/**
+ * Publishes DiskBalancer report which will be sent to SCM as part of heartbeat.
+ * DiskBalancer Report consist of the following information:
+ * - isBalancerRunning
+ * - balancedBytes
+ * - DiskBalancerConfiguration
+ */
+public class DiskBalancerReportPublisher extends
+ ReportPublisher<DiskBalancerReportProto> {
+
+ private Long diskBalancerReportInterval = null;
+
+ @Override
+ protected long getReportFrequency() {
+ if (diskBalancerReportInterval == null) {
+ diskBalancerReportInterval = getConf().getTimeDuration(
+ HDDS_DISK_BALANCER_REPORT_INTERVAL,
+ HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+ getConf());
+
+ Preconditions.checkState(
+ heartbeatFrequency <= diskBalancerReportInterval,
+ HDDS_DISK_BALANCER_REPORT_INTERVAL +
+ " cannot be configured lower than heartbeat frequency " +
+ heartbeatFrequency + ".");
+ }
+ return diskBalancerReportInterval;
+ }
+
+ @Override
+ protected DiskBalancerReportProto getReport() {
+ return getContext().getParent().getContainer().getDiskBalancerReport();
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index 3be1b5e077..19d2806919 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -55,6 +56,8 @@ public class ReportPublisherFactory {
report2publisher.put(PipelineReportsProto.class,
PipelineReportPublisher.class);
report2publisher.put(CRLStatusReport.class, CRLStatusReportPublisher.class);
+ report2publisher.put(DiskBalancerReportProto.class,
+ DiskBalancerReportPublisher.class);
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 01d114245a..86d92122cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Comm
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CreatePipelineCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DiskBalancerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
@@ -228,6 +229,7 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new SetNodeOperationalStateCommandHandler(conf))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
+ .addHandler(new DiskBalancerCommandHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index c4d3428a80..c75c7a455d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -98,6 +99,9 @@ public class StateContext {
@VisibleForTesting
static final String CRL_STATUS_REPORT_PROTO_NAME =
CRLStatusReport.getDescriptor().getFullName();
+ @VisibleForTesting
+ static final String DISK_BALANCER_REPORT_PROTO_NAME =
+ DiskBalancerReportProto.getDescriptor().getFullName();
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
@@ -113,6 +117,7 @@ public class StateContext {
private final AtomicReference<Message> nodeReport;
private final AtomicReference<Message> pipelineReports;
private final AtomicReference<Message> crlStatusReport;
+ private final AtomicReference<Message> diskBalancerReport;
// Incremental reports are queued in the map below
private final Map<InetSocketAddress, List<Message>>
incrementalReportsQueue;
@@ -172,6 +177,7 @@ public class StateContext {
nodeReport = new AtomicReference<>();
pipelineReports = new AtomicReference<>();
crlStatusReport = new AtomicReference<>(); // Certificate Revocation List
+ diskBalancerReport = new AtomicReference<>();
endpoints = new HashSet<>();
containerActions = new HashMap<>();
pipelineActions = new HashMap<>();
@@ -197,6 +203,8 @@ public class StateContext {
type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
+ fullReportTypeList.add(DISK_BALANCER_REPORT_PROTO_NAME);
+ type2Reports.put(DISK_BALANCER_REPORT_PROTO_NAME, diskBalancerReport);
}
/**
@@ -929,6 +937,11 @@ public class StateContext {
return crlStatusReport.get();
}
+ @VisibleForTesting
+ public Message getDiskBalancerReport() {
+ return diskBalancerReport.get();
+ }
+
public void configureReconHeartbeatFrequency() {
reconHeartbeatFrequency.set(getReconHeartbeatInterval(conf));
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
new file mode 100644
index 0000000000..705c0af0e1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for DiskBalancer command received from SCM.
+ */
+public class DiskBalancerCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DiskBalancerCommandHandler.class);
+
+ private AtomicLong invocationCount = new AtomicLong(0);
+ private long totalTime;
+
+ /**
+ * Constructs a diskBalancerCommand handler.
+ */
+ public DiskBalancerCommandHandler() {
+ }
+
+ /**
+ * Handles a given SCM command.
+ *
+ * @param command - SCM Command
+ * @param ozoneContainer - Ozone Container.
+ * @param context - Current Context.
+ * @param connectionManager - The SCMs that we are talking to.
+ */
+ @Override
+ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ StateContext context, SCMConnectionManager connectionManager) {
+ invocationCount.incrementAndGet();
+ // TODO: Do start/stop/update operation
+ }
+
+ /**
+ * Returns the command type that this command handler handles.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.diskBalancerCommand;
+ }
+
+ /**
+ * Returns number of times this handler has been invoked.
+ *
+ * @return int
+ */
+ @Override
+ public int getInvocationCount() {
+ return (int)invocationCount.get();
+ }
+
+ /**
+ * Returns the average time this function takes to run.
+ *
+ * @return long
+ */
+ @Override
+ public long getAverageRunTime() {
+ if (invocationCount.get() > 0) {
+ return totalTime / invocationCount.get();
+ }
+ return 0;
+ }
+
+ @Override
+ public int getQueuedCount() {
+ return 0;
+ }
+
+ private void startDiskBalancer(DiskBalancerConfiguration configuration) {
+ // Todo: add implementation to start DiskBalancer
+ }
+
+ private void stopDiskBalancer() {
+ // Todo: add implementation to stop DiskBalancer
+ }
+
+ private void updateDiskBalancer(DiskBalancerConfiguration
+ configuration) {
+ // Todo: add implementation to update diskBalancer configuration
+ }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 3a1bd8ffb3..ac5a14b6aa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
@@ -418,6 +419,12 @@ public class HeartbeatEndpointTask
commandResponseProto.getRefreshVolumeUsageCommandProto());
processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
break;
+ case diskBalancerCommand:
+ DiskBalancerCommand diskBalancerCommand =
+ DiskBalancerCommand.getFromProtobuf(
+ commandResponseProto.getDiskBalancerCommandProto(), conf);
+ processCommonCommand(commandResponseProto, diskBalancerCommand);
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index b3db557b6f..14a89bb867 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -521,4 +522,9 @@ public class OzoneContainer {
StorageVolumeChecker getVolumeChecker(ConfigurationSource conf) {
return new StorageVolumeChecker(conf, new Timer());
}
+
+ public DiskBalancerReportProto getDiskBalancerReport() {
+ // TODO: Return real disk balancer report
+ return null;
+ }
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
new file mode 100644
index 0000000000..ea97f34ca0
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerCommandProto;
+
+/**
+ * Informs a datanode to update DiskBalancer status.
+ */
+public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> {
+
+ private final boolean shouldRun;
+ private final DiskBalancerConfiguration diskBalancerConfiguration;
+
+ public DiskBalancerCommand(final boolean shouldRun,
+ final DiskBalancerConfiguration diskBalancerConfiguration) {
+ this.shouldRun = shouldRun;
+ this.diskBalancerConfiguration = diskBalancerConfiguration;
+ }
+
+ /**
+ * Returns the type of this command.
+ *
+ * @return Type
+ */
+ @Override
+ public SCMCommandProto.Type getType() {
+ return SCMCommandProto.Type.diskBalancerCommand;
+ }
+
+ @Override
+ public DiskBalancerCommandProto getProto() {
+ return DiskBalancerCommandProto.newBuilder()
+ .setShouldRun(shouldRun)
+ .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder())
+ .build();
+ }
+
+ public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto
+ diskbalancerCommandProto, ConfigurationSource configuration) {
+ Preconditions.checkNotNull(diskbalancerCommandProto);
+ return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(),
+ DiskBalancerConfiguration.fromProtobuf(
+ diskbalancerCommandProto.getDiskBalancerConf(), configuration));
+ }
+
+ public boolean isShouldRun() {
+ return shouldRun;
+ }
+
+ public DiskBalancerConfiguration getDiskBalancerConfiguration() {
+ return diskBalancerConfiguration;
+ }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index d611caf126..c09261d4ab 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
import org.apache.hadoop.hdds.protocol.proto.
@@ -39,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.
import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.jupiter.api.Assertions;
@@ -215,6 +218,45 @@ public class TestReportPublisher {
executorService.shutdown();
}
+ @Test
+ public void testDiskBalancerReportPublisher() throws IOException {
+ StateContext dummyContext = Mockito.mock(StateContext.class);
+ DatanodeStateMachine dummyStateMachine =
+ Mockito.mock(DatanodeStateMachine.class);
+ OzoneContainer dummyContainer = Mockito.mock(OzoneContainer.class);
+
+ DiskBalancerReportProto.Builder builder =
+ DiskBalancerReportProto.newBuilder();
+ builder.setIsRunning(true);
+ builder.setBalancedBytes(1L);
+ builder.setDiskBalancerConf(
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder().build());
+ DiskBalancerReportProto dummyReport = builder.build();
+
+ ReportPublisher publisher = new DiskBalancerReportPublisher();
+ when(dummyContext.getParent()).thenReturn(dummyStateMachine);
+ when(dummyStateMachine.getContainer()).thenReturn(dummyContainer);
+ when(dummyContainer.getDiskBalancerReport()).thenReturn(dummyReport);
+ publisher.setConf(config);
+
+ ScheduledExecutorService executorService = HadoopExecutors
+ .newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Unit test ReportManager Thread - %d").build());
+ publisher.init(dummyContext, executorService);
+ Message report =
+ ((DiskBalancerReportPublisher) publisher).getReport();
+ Assert.assertNotNull(report);
+ for (Descriptors.FieldDescriptor descriptor :
+ report.getDescriptorForType().getFields()) {
+ if (descriptor.getNumber() ==
+ DiskBalancerReportProto.ISRUNNING_FIELD_NUMBER) {
+ Assert.assertEquals(true, report.getField(descriptor));
+ }
+ }
+ executorService.shutdown();
+ }
+
/**
* Get a datanode details.
*
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
index ea915f98a2..0b43bb93f9 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.junit.Assert;
import org.junit.Rule;
@@ -66,6 +67,17 @@ public class TestReportPublisherFactory {
Assert.assertEquals(conf, publisher.getConf());
}
+ @Test
+ public void testGetDiskBalancerReportPublisher() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+ ReportPublisher publisher = factory
+ .getPublisherFor(DiskBalancerReportProto.class);
+ Assert.assertEquals(DiskBalancerReportPublisher.class,
+ publisher.getClass());
+ Assert.assertEquals(conf, publisher.getConf());
+ }
+
@Test
public void testInvalidReportPublisher() {
OzoneConfiguration conf = new OzoneConfiguration();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 0d35798352..936e31d760 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -45,6 +45,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
@@ -1052,11 +1055,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts, int clientVersion) throws IOException {
+ Optional<List<String>> hosts,
+ Optional<HddsProtos.DiskBalancerRunningStatus> status,
+ int clientVersion) throws IOException {
DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
DatanodeDiskBalancerInfoRequestProto.newBuilder()
.setInfoType(DatanodeDiskBalancerInfoType.status);
hosts.ifPresent(requestBuilder::addAllHosts);
+ status.ifPresent(requestBuilder::setStatus);
DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
DatanodeDiskBalancerInfoResponseProto response =
@@ -1068,22 +1074,83 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
}
@Override
- public void startDiskBalancer(Optional<Double> threshold,
- Optional<Double> bandwidth, Optional<List<String>> hosts)
- throws IOException {
+ public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+ Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+ Optional<List<String>> hosts) throws IOException {
+ HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+ threshold.ifPresent(confBuilder::setThreshold);
+ bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
+ parallelThread.ifPresent(confBuilder::setParallelThread);
+
+ DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+ DatanodeDiskBalancerOpRequestProto.newBuilder()
+ .setOpType(DatanodeDiskBalancerOpType.start)
+ .setConf(confBuilder);
+ hosts.ifPresent(requestBuilder::addAllHosts);
+ DatanodeDiskBalancerOpResponseProto response =
+ submitRequest(Type.DatanodeDiskBalancerOp,
+ builder -> builder.setDatanodeDiskBalancerOpRequest(
+ requestBuilder.build()))
+ .getDatanodeDiskBalancerOpResponse();
+
+ List<DatanodeAdminError> errors = new ArrayList<>();
+ for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+ errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+ }
+ return errors;
}
@Override
- public void stopDiskBalancer(Optional<List<String>> hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
throws IOException {
+ DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+ DatanodeDiskBalancerOpRequestProto.newBuilder()
+ .setOpType(DatanodeDiskBalancerOpType.stop);
+ hosts.ifPresent(requestBuilder::addAllHosts);
+
+ DatanodeDiskBalancerOpResponseProto response =
+ submitRequest(Type.DatanodeDiskBalancerOp,
+ builder -> builder.setDatanodeDiskBalancerOpRequest(
+ requestBuilder.build()))
+ .getDatanodeDiskBalancerOpResponse();
+ List<DatanodeAdminError> errors = new ArrayList<>();
+ for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+ errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+ }
+ return errors;
}
@Override
- public void updateDiskBalancerConfiguration(Optional<Double> threshold,
- Optional<Double> bandwidth, Optional<List<String>> hosts)
+ public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+ Optional<Double> threshold, Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread, Optional<List<String>> hosts)
throws IOException {
+ HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+ threshold.ifPresent(confBuilder::setThreshold);
+ bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
+ parallelThread.ifPresent(confBuilder::setParallelThread);
+
+ DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+ DatanodeDiskBalancerOpRequestProto.newBuilder()
+ .setOpType(DatanodeDiskBalancerOpType.update)
+ .setConf(confBuilder);
+ hosts.ifPresent(requestBuilder::addAllHosts);
+
+ DatanodeDiskBalancerOpResponseProto response =
+ submitRequest(Type.DatanodeDiskBalancerOp,
+ builder -> builder.setDatanodeDiskBalancerOpRequest(
+ requestBuilder.build()))
+ .getDatanodeDiskBalancerOpResponse();
+
+ List<DatanodeAdminError> errors = new ArrayList<>();
+ for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+ errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+ }
+ return errors;
}
@Override
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 766db3429c..d6c552c4b4 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -78,7 +78,8 @@ message ScmContainerLocationRequest {
optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39;
optional ReplicationManagerReportRequestProto replicationManagerReportRequest = 40;
optional ResetDeletedBlockRetryCountRequestProto resetDeletedBlockRetryCountRequest = 41;
- optional DatanodeDiskBalancerInfoRequestProto DatanodeDiskBalancerInfoRequest = 42;
+ optional DatanodeDiskBalancerInfoRequestProto datanodeDiskBalancerInfoRequest = 42;
+ optional DatanodeDiskBalancerOpRequestProto datanodeDiskBalancerOpRequest = 43;
}
message ScmContainerLocationResponse {
@@ -128,7 +129,8 @@ message ScmContainerLocationResponse {
optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39;
optional ReplicationManagerReportResponseProto getReplicationManagerReportResponse = 40;
optional ResetDeletedBlockRetryCountResponseProto resetDeletedBlockRetryCountResponse = 41;
- optional DatanodeDiskBalancerInfoResponseProto DatanodeDiskBalancerInfoResponse = 42;
+ optional DatanodeDiskBalancerInfoResponseProto datanodeDiskBalancerInfoResponse = 42;
+ optional DatanodeDiskBalancerOpResponseProto datanodeDiskBalancerOpResponse = 43;
enum Status {
OK = 1;
@@ -178,6 +180,7 @@ enum Type {
ResetDeletedBlockRetryCount = 36;
GetClosedContainerCount = 37;
DatanodeDiskBalancerInfo= 38;
+ DatanodeDiskBalancerOp = 39;
}
/**
@@ -341,12 +344,29 @@ message DatanodeDiskBalancerInfoRequestProto {
required DatanodeDiskBalancerInfoType infoType = 1;
optional uint32 count = 2;
repeated string hosts = 3;
+ optional DiskBalancerRunningStatus status = 4;
}
message DatanodeDiskBalancerInfoResponseProto {
repeated DatanodeDiskBalancerInfoProto info = 1;
}
+enum DatanodeDiskBalancerOpType{
+ start = 1;
+ stop = 2;
+ update = 3;
+}
+
+message DatanodeDiskBalancerOpRequestProto {
+ required DatanodeDiskBalancerOpType opType = 1;
+ repeated string hosts = 2;
+ optional DiskBalancerConfigurationProto conf = 3;
+}
+
+message DatanodeDiskBalancerOpResponseProto {
+ repeated DatanodeAdminErrorResponseProto failedHosts = 1;
+}
+
/*
Decommission a list of hosts
*/
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 7cee2f6160..231e105aea 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -455,13 +455,19 @@ message ContainerBalancerConfigurationProto {
message DiskBalancerConfigurationProto {
optional double threshold = 1;
- optional double diskBandwidth = 2;
+ optional uint64 diskBandwidthInMB = 2;
optional int32 parallelThread = 3;
}
+enum DiskBalancerRunningStatus {
+ RUNNING = 1;
+ STOPPED = 2;
+ UNKNOWN = 3;
+}
+
message DatanodeDiskBalancerInfoProto {
required DatanodeDetailsProto node = 1;
required double currentVolumeDensitySum = 2;
- optional bool diskBalancerRunning = 3;
+ optional DiskBalancerRunningStatus runningStatus = 3;
optional DiskBalancerConfigurationProto diskBalancerConf = 4;
}
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 665665ee38..81e9ee54d3 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -139,6 +139,7 @@ message SCMHeartbeatRequestProto {
optional PipelineReportsProto pipelineReports = 8;
optional LayoutVersionProto dataNodeLayoutVersion = 9;
optional CommandQueueReportProto commandQueueReport = 10;
+ optional DiskBalancerReportProto diskBalancerReport = 11;
}
message CommandQueueReportProto {
@@ -325,6 +326,7 @@ message SCMCommandProto {
finalizeNewLayoutVersionCommand = 9;
refreshVolumeUsageInfo = 10;
reconstructECContainersCommand = 11;
+ diskBalancerCommand = 12;
}
// TODO: once we start using protoc 3.x, refactor this message using "oneof"
required Type commandType = 1;
@@ -340,6 +342,7 @@ message SCMCommandProto {
finalizeNewLayoutVersionCommandProto = 10;
optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11;
optional ReconstructECContainersCommandProto reconstructECContainersCommandProto = 12;
+ optional DiskBalancerCommandProto diskBalancerCommandProto = 13;
// If running upon Ratis, holds term of underlying RaftServer iff current
@@ -442,6 +445,14 @@ message DatanodeDetailsAndReplicaIndexProto {
required int32 replicaIndex = 2;
}
+/**
+This command asks the datanode to update diskBalancer status
+ */
+message DiskBalancerCommandProto {
+ required bool shouldRun = 1;
+ optional DiskBalancerConfigurationProto diskBalancerConf = 2;
+}
+
/**
This command asks the datanode to create a pipeline.
*/
@@ -480,6 +491,12 @@ message CRLStatusReport {
repeated int64 pendingCrlIds=2;
}
+message DiskBalancerReportProto {
+ required bool isRunning = 1;
+ optional uint64 balancedBytes = 2;
+ optional DiskBalancerConfigurationProto diskBalancerConf = 3;
+}
+
/**
* This command asks the datanode to process a new CRL.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 639c884795..16c995e1eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CRLStatu
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -227,6 +228,14 @@ public final class SCMEvents {
new TypedEvent<>(CRLStatusReportFromDatanode.class,
"Crl_Status_Report");
+ /**
+ * DiskBalancer reports are send out by Datanodes. This report is received by
+ * SCMDatanodeHeartbeatDispatcher and DiskBalancer_Report Event is generated.
+ */
+ public static final TypedEvent<DiskBalancerReportFromDatanode>
+ DISK_BALANCER_REPORT = new TypedEvent<>(
+ DiskBalancerReportFromDatanode.class, "DiskBalancer_Report");
+
/**
* Private Ctor. Never Constructed.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 5ca66e9eef..c6114c52f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -22,7 +22,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -47,10 +48,13 @@ public class DiskBalancerManager {
public static final Logger LOG =
LoggerFactory.getLogger(DiskBalancerManager.class);
+
+ private OzoneConfiguration conf;
private final EventPublisher scmNodeEventPublisher;
private final SCMContext scmContext;
private final NodeManager nodeManager;
private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+ private Map<DatanodeDetails, Long> balancedBytesMap;
private boolean useHostnames;
/**
@@ -60,6 +64,7 @@ public class DiskBalancerManager {
EventPublisher eventPublisher,
SCMContext scmContext,
NodeManager nodeManager) {
+ this.conf = conf;
this.scmNodeEventPublisher = eventPublisher;
this.scmContext = scmContext;
this.nodeManager = nodeManager;
@@ -95,42 +100,60 @@ public class DiskBalancerManager {
* If hosts is null, return status of all datanodes in balancing.
*/
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts, int clientVersion) throws IOException {
- List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
- new ArrayList<>();
+ Optional<List<String>> hosts,
+ Optional<HddsProtos.DiskBalancerRunningStatus> status,
+ int clientVersion) throws IOException {
List<DatanodeDetails> filterDns = null;
if (hosts.isPresent() && !hosts.get().isEmpty()) {
filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
useHostnames);
}
- for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
- HddsProtos.NodeState.HEALTHY)) {
- if (shouldReturnDatanode(filterDns, datanodeDetails)) {
- double volumeDensitySum =
- getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
- statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
- .setCurrentVolumeDensitySum(volumeDensitySum)
- .setDiskBalancerRunning(isRunning(datanodeDetails))
- .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
- DiskBalancerStatus.DUMMY_STATUS)
- .getDiskBalancerConfiguration().toProtobufBuilder())
- .setNode(datanodeDetails.toProto(clientVersion))
- .build());
- }
+ // Filter Running Status by default
+ HddsProtos.DiskBalancerRunningStatus filterStatus = status.orElse(null);
+
+ if (filterDns != null) {
+ return filterDns.stream()
+ .filter(dn -> shouldReturnDatanode(filterStatus, dn))
+ .map(nodeManager::getDatanodeInfo)
+ .map(dn -> getInfoProto(dn, clientVersion))
+ .collect(Collectors.toList());
+ } else {
+ return nodeManager.getAllNodes().stream()
+ .filter(dn -> shouldReturnDatanode(filterStatus, dn))
+ .map(dn -> getInfoProto((DatanodeInfo)dn, clientVersion))
+ .collect(Collectors.toList());
}
- return statusList;
}
- private boolean shouldReturnDatanode(List<DatanodeDetails> hosts,
+ private boolean shouldReturnDatanode(
+ HddsProtos.DiskBalancerRunningStatus status,
DatanodeDetails datanodeDetails) {
- if (hosts == null || hosts.isEmpty()) {
- return isRunning(datanodeDetails);
- } else {
- return hosts.contains(datanodeDetails);
+ boolean shouldReturn = true;
+ // If status specified, do not return if status not match.
+ if (status != null && getRunningStatus(datanodeDetails) != status) {
+ shouldReturn = false;
}
+ return shouldReturn;
}
+ private HddsProtos.DatanodeDiskBalancerInfoProto getInfoProto(
+ DatanodeInfo dn, int clientVersion) {
+ double volumeDensitySum =
+ getVolumeDataDensitySumForDatanodeDetails(dn);
+ HddsProtos.DiskBalancerRunningStatus runningStatus =
+ getRunningStatus(dn);
+ HddsProtos.DatanodeDiskBalancerInfoProto.Builder builder =
+ HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+ .setNode(dn.toProto(clientVersion))
+ .setCurrentVolumeDensitySum(volumeDensitySum)
+ .setRunningStatus(getRunningStatus(dn));
+ if (runningStatus != HddsProtos.DiskBalancerRunningStatus.UNKNOWN) {
+ builder.setDiskBalancerConf(statusMap.get(dn)
+ .getDiskBalancerConfiguration().toProtobufBuilder());
+ }
+ return builder.build();
+ }
/**
* Get volume density for a specific DatanodeDetails node.
*
@@ -144,8 +167,7 @@ public class DiskBalancerManager {
DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails;
double totalCapacity = 0d, totalUsed = 0d;
- for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto :
- datanodeInfo.getStorageReports()) {
+ for (StorageReportProto reportProto : datanodeInfo.getStorageReports()) {
totalCapacity += reportProto.getCapacity();
totalUsed += reportProto.getScmUsed();
}
@@ -162,10 +184,17 @@ public class DiskBalancerManager {
return volumeDensitySum;
}
- private boolean isRunning(DatanodeDetails datanodeDetails) {
- return statusMap
- .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS)
- .isRunning();
+ private HddsProtos.DiskBalancerRunningStatus getRunningStatus(
+ DatanodeDetails datanodeDetails) {
+ if (!statusMap.containsKey(datanodeDetails)) {
+ return HddsProtos.DiskBalancerRunningStatus.UNKNOWN;
+ } else {
+ if (statusMap.get(datanodeDetails).isRunning()) {
+ return HddsProtos.DiskBalancerRunningStatus.RUNNING;
+ } else {
+ return HddsProtos.DiskBalancerRunningStatus.STOPPED;
+ }
+ }
}
@VisibleForTesting
@@ -173,4 +202,24 @@ public class DiskBalancerManager {
statusMap.put(datanodeDetails, new DiskBalancerStatus(true,
new DiskBalancerConfiguration()));
}
-}
\ No newline at end of file
+
+ public void processDiskBalancerReport(DiskBalancerReportProto reportProto,
+ DatanodeDetails dn) {
+ boolean isRunning = reportProto.getIsRunning();
+ DiskBalancerConfiguration diskBalancerConfiguration =
+ reportProto.hasDiskBalancerConf() ?
+ DiskBalancerConfiguration.fromProtobuf(
+ reportProto.getDiskBalancerConf(), conf) :
+ new DiskBalancerConfiguration();
+ statusMap.put(dn, new DiskBalancerStatus(isRunning,
+ diskBalancerConfiguration));
+ if (reportProto.hasBalancedBytes()) {
+ balancedBytesMap.put(dn, reportProto.getBalancedBytes());
+ }
+ }
+
+ @VisibleForTesting
+ public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() {
+ return statusMap;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
new file mode 100644
index 0000000000..47158972ec
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles DiskBalancer Reports from datanode.
+ */
+public class DiskBalancerReportHandler implements
+ EventHandler<DiskBalancerReportFromDatanode> {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(DiskBalancerReportHandler.class);
+
+ private DiskBalancerManager diskBalancerManager;
+
+ public DiskBalancerReportHandler(DiskBalancerManager diskBalancerManager) {
+ this.diskBalancerManager = diskBalancerManager;
+ }
+
+ @Override
+ public void onMessage(DiskBalancerReportFromDatanode reportFromDatanode,
+ EventPublisher publisher) {
+ Preconditions.checkNotNull(reportFromDatanode);
+ DatanodeDetails dn = reportFromDatanode.getDatanodeDetails();
+ DiskBalancerReportProto diskBalancerReportProto =
+ reportFromDatanode.getReport();
+ Preconditions.checkNotNull(dn,
+ "DiskBalancer Report is missing DatanodeDetails.");
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("Processing diskBalancer report for dn: {}", dn);
+ }
+ try {
+ diskBalancerManager.processDiskBalancerReport(
+ diskBalancerReportProto, dn);
+ } catch (Exception e) {
+ LOGGER.error("Failed to process diskBalancer report={} from dn={}.",
+ diskBalancerReportProto, dn, e);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
index ed22e80e3c..65f577bcee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -32,8 +32,6 @@ public class DiskBalancerStatus {
private boolean isRunning;
private DiskBalancerConfiguration diskBalancerConfiguration;
- public static final DiskBalancerStatus DUMMY_STATUS =
- new DiskBalancerStatus(false, new DiskBalancerConfiguration());
public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) {
this.isRunning = isRunning;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 8f72375bcd..c1dffe7f4b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -162,6 +162,14 @@ public interface NodeManager extends StorageContainerNodeProtocol,
*/
DatanodeUsageInfo getUsageInfo(DatanodeDetails dn);
+ /**
+ * Get the datanode info of a specified datanode.
+ *
+ * @param dn the usage of which we want to get
+ * @return DatanodeInfo of the specified datanode
+ */
+ DatanodeInfo getDatanodeInfo(DatanodeDetails dn);
+
/**
* Return the node stat of the specified datanode.
* @param datanodeDetails DatanodeDetails.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 7a62088ba7..935bfbc6a8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -860,6 +860,23 @@ public class SCMNodeManager implements NodeManager {
return usageInfo;
}
+ /**
+ * Get the usage info of a specified datanode.
+ *
+ * @param dn the usage of which we want to get
+ * @return DatanodeUsageInfo of the specified datanode
+ */
+ @Override
+ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+ try {
+ return nodeStateManager.getNode(dn);
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Cannot retrieve DatanodeInfo, datanode {} not found.",
+ dn.getUuid());
+ return null;
+ }
+ }
+
/**
* Return the node stat of the specified datanode.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 2f50456211..38a2503b55 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
@@ -672,6 +673,13 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
request.getDatanodeDiskBalancerInfoRequest(),
request.getVersion()))
.build();
+ case DatanodeDiskBalancerOp:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setDatanodeDiskBalancerOpResponse(getDatanodeDiskBalancerOp(
+ request.getDatanodeDiskBalancerOpRequest()))
+ .build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
@@ -1183,6 +1191,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
request.getTransactionIdList()))
.build();
}
+
public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo(
StorageContainerLocationProtocolProtos.
DatanodeDiskBalancerInfoRequestProto request, int clientVersion)
@@ -1195,7 +1204,9 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
break;
case status:
infoProtoList = impl.getDiskBalancerStatus(
- Optional.of(request.getHostsList()), clientVersion);
+ Optional.of(request.getHostsList()),
+ Optional.of(request.getStatus()),
+ clientVersion);
break;
default:
infoProtoList = null;
@@ -1204,4 +1215,43 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
.addAllInfo(infoProtoList)
.build();
}
+
+ public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp(
+ StorageContainerLocationProtocolProtos.
+ DatanodeDiskBalancerOpRequestProto request)
+ throws IOException {
+ List<DatanodeAdminError> errors;
+ switch (request.getOpType()) {
+ case start:
+ errors = impl.startDiskBalancer(
+ Optional.of(request.getConf().getThreshold()),
+ Optional.of(request.getConf().getDiskBandwidthInMB()),
+ Optional.of(request.getConf().getParallelThread()),
+ Optional.of(request.getHostsList()));
+ break;
+ case update:
+ errors = impl.updateDiskBalancerConfiguration(
+ Optional.of(request.getConf().getThreshold()),
+ Optional.of(request.getConf().getDiskBandwidthInMB()),
+ Optional.of(request.getConf().getParallelThread()),
+ Optional.of(request.getHostsList()));
+ break;
+ case stop:
+ errors = impl.stopDiskBalancer(Optional.of(request.getHostsList()));
+ break;
+ default:
+ errors = new ArrayList<>();
+ }
+
+ DatanodeDiskBalancerOpResponseProto.Builder response =
+ DatanodeDiskBalancerOpResponseProto.newBuilder();
+ for (DatanodeAdminError e : errors) {
+ DatanodeAdminErrorResponseProto.Builder error =
+ DatanodeAdminErrorResponseProto.newBuilder();
+ error.setHost(e.getHostname());
+ error.setError(e.getError());
+ response.addFailedHosts(error);
+ }
+ return response.build();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e9759da960..d3f23089be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1142,7 +1142,9 @@ public class SCMClientProtocolServer implements
@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts, int clientVersion) throws IOException {
+ Optional<List<String>> hosts,
+ Optional<HddsProtos.DiskBalancerRunningStatus> status,
+ int clientVersion) throws IOException {
// check admin authorisation
try {
getScm().checkAdminAccess(getRemoteUser());
@@ -1151,29 +1153,33 @@ public class SCMClientProtocolServer implements
throw e;
}
- return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts,
+ return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status,
clientVersion);
}
@Override
- public void startDiskBalancer(Optional<Double> threshold,
- Optional<Double> bandwidth, Optional<List<String>> hosts)
- throws IOException {
+ public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+ Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+ Optional<List<String>> hosts) throws IOException {
// TODO: Send message to datanodes
+ return null;
}
@Override
- public void stopDiskBalancer(Optional<List<String>> hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
throws IOException {
// TODO: Send message to datanodes
+ return null;
}
@Override
- public void updateDiskBalancerConfiguration(Optional<Double> threshold,
- Optional<Double> bandwidth, Optional<List<String>> hosts)
+ public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+ Optional<Double> threshold, Optional<Long> bandwidthInMB,
+ Optional<Integer> parallelThread, Optional<List<String>> hosts)
throws IOException {
// TODO: Send message to datanodes
+ return null;
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 02e1b3fc61..35e15e4efd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
@@ -58,6 +59,7 @@ import java.util.UUID;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DISK_BALANCER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.INCREMENTAL_CONTAINER_REPORT;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
@@ -212,6 +214,15 @@ public final class SCMDatanodeHeartbeatDispatcher {
commandStatusReport));
}
}
+
+ if (heartbeat.hasDiskBalancerReport()) {
+ LOG.debug("Dispatching DiskBalancer Report.");
+ eventPublisher.fireEvent(
+ DISK_BALANCER_REPORT,
+ new DiskBalancerReportFromDatanode(
+ datanodeDetails,
+ heartbeat.getDiskBalancerReport()));
+ }
}
return commands;
@@ -445,4 +456,16 @@ public final class SCMDatanodeHeartbeatDispatcher {
super(datanodeDetails, report);
}
}
+
+ /**
+ * DiskBalancer report event payload with origin.
+ */
+ public static class DiskBalancerReportFromDatanode
+ extends ReportFromDatanode<DiskBalancerReportProto> {
+
+ public DiskBalancerReportFromDatanode(DatanodeDetails datanodeDetails,
+ DiskBalancerReportProto report) {
+ super(datanodeDetails, report);
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b8727173f1..f4317ec9b2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerReportHandler;
import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl;
@@ -454,6 +455,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
new PipelineActionHandler(pipelineManager, scmContext, configuration);
CRLStatusReportHandler crlStatusReportHandler =
new CRLStatusReportHandler(certificateStore, configuration);
+ DiskBalancerReportHandler diskBalancerReportHandler =
+ new DiskBalancerReportHandler(diskBalancerManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
@@ -528,7 +531,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
-
+ eventQueue.addHandler(SCMEvents.DISK_BALANCER_REPORT,
+ diskBalancerReportHandler);
}
private void initializeCertificateClient() {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index e1eaf251f5..9cb923c078 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -346,7 +346,36 @@ public class MockNodeManager implements NodeManager {
*/
@Override
public List<DatanodeDetails> getAllNodes() {
- return new ArrayList<>(nodeMetricMap.keySet());
+ // mock storage reports for TestDiskBalancer
+ List<DatanodeDetails> healthyNodesWithInfo = new ArrayList<>();
+ for (Map.Entry<DatanodeDetails, SCMNodeStat> entry:
+ nodeMetricMap.entrySet()) {
+ NodeStatus nodeStatus = NodeStatus.inServiceHealthy();
+ if (staleNodes.contains(entry.getKey())) {
+ nodeStatus = NodeStatus.inServiceStale();
+ } else if (deadNodes.contains(entry.getKey())) {
+ nodeStatus = NodeStatus.inServiceDead();
+ }
+ DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus,
+ UpgradeUtils.defaultLayoutVersionProto());
+
+ long capacity = entry.getValue().getCapacity().get();
+ long used = entry.getValue().getScmUsed().get();
+ long remaining = entry.getValue().getRemaining().get();
+ StorageReportProto storage1 = HddsTestUtils.createStorageReport(
+ di.getUuid(), "/data1-" + di.getUuidString(),
+ capacity, used, remaining, null);
+ MetadataStorageReportProto metaStorage1 =
+ HddsTestUtils.createMetadataStorageReport(
+ "/metadata1-" + di.getUuidString(), capacity, used,
+ remaining, null);
+ di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1)));
+ di.updateMetaDataStorageReports(
+ new ArrayList<>(Arrays.asList(metaStorage1)));
+
+ healthyNodesWithInfo.add(di);
+ }
+ return healthyNodesWithInfo;
}
/**
@@ -409,6 +438,26 @@ public class MockNodeManager implements NodeManager {
return new DatanodeUsageInfo(datanodeDetails, stat);
}
+ @Override
+ public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) {
+ DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(),
+ UpgradeUtils.defaultLayoutVersionProto());
+ long capacity = nodeMetricMap.get(dd).getCapacity().get();
+ long used = nodeMetricMap.get(dd).getScmUsed().get();
+ long remaining = nodeMetricMap.get(dd).getRemaining().get();
+ StorageReportProto storage1 = HddsTestUtils.createStorageReport(
+ di.getUuid(), "/data1-" + di.getUuidString(),
+ capacity, used, remaining, null);
+ MetadataStorageReportProto metaStorage1 =
+ HddsTestUtils.createMetadataStorageReport(
+ "/metadata1-" + di.getUuidString(), capacity, used,
+ remaining, null);
+ di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1)));
+ di.updateMetaDataStorageReports(
+ new ArrayList<>(Arrays.asList(metaStorage1)));
+ return di;
+ }
+
/**
* Return the node stat of the specified datanode.
* @param datanodeDetails - datanode details.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 22e01b9770..16f013b63f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -244,6 +244,11 @@ public class SimpleMockNodeManager implements NodeManager {
return null;
}
+ @Override
+ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+ return null;
+ }
+
@Override
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
return null;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
index 541c9764b7..7005dea292 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -21,8 +21,10 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.ozone.test.GenericTestUtils;
@@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -46,6 +49,8 @@ public class TestDiskBalancerManager {
private NodeManager nodeManager;
private OzoneConfiguration conf;
private String storageDir;
+ private DiskBalancerReportHandler diskBalancerReportHandler;
+ private Random random;
@BeforeEach
public void setup() throws Exception {
@@ -56,6 +61,9 @@ public class TestDiskBalancerManager {
nodeManager = new MockNodeManager(true, 3);
diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
SCMContext.emptyContext(), nodeManager);
+ diskBalancerReportHandler =
+ new DiskBalancerReportHandler(diskBalancerManager);
+ random = new Random();
}
@Test
@@ -73,6 +81,7 @@ public class TestDiskBalancerManager {
@Test
public void testDatanodeDiskBalancerStatus() throws IOException {
diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0));
+ diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(1));
// Simulate users asking all status of 3 datanodes
List<String> dns = nodeManager.getAllNodes().stream().map(
@@ -81,6 +90,7 @@ public class TestDiskBalancerManager {
List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+ Optional.empty(),
ClientVersion.CURRENT_VERSION);
Assertions.assertEquals(3, statusProtoList.size());
@@ -92,8 +102,32 @@ public class TestDiskBalancerManager {
statusProtoList =
diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+ Optional.empty(),
ClientVersion.CURRENT_VERSION);
Assertions.assertEquals(1, statusProtoList.size());
}
+
+ @Test
+ public void testHandleDiskBalancerReportFromDatanode() {
+ for (DatanodeDetails dn: nodeManager.getAllNodes()) {
+ diskBalancerReportHandler.onMessage(
+ new DiskBalancerReportFromDatanode(dn, generateRandomReport()), null);
+ }
+
+ Assertions.assertEquals(3, diskBalancerManager.getStatusMap().size());
+ }
+
+ private DiskBalancerReportProto generateRandomReport() {
+ return DiskBalancerReportProto.newBuilder()
+ .setIsRunning(random.nextBoolean())
+ .setBalancedBytes(random.nextInt(10000))
+ .setDiskBalancerConf(
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+ .setThreshold(random.nextInt(99))
+ .setParallelThread(random.nextInt(4) + 1)
+ .setDiskBandwidthInMB(random.nextInt(99) + 1)
+ .build())
+ .build();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 436bccb09d..2be5da3655 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -189,6 +190,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
return null;
}
+ @Override
+ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+ return null;
+ }
+
/**
* Return the node stat of the specified datanode.
*
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 6eb9c587b9..bc25a41795 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -505,31 +505,34 @@ public class ContainerOperationClient implements ScmClient {
}
@Override
- public void startDiskBalancer(Optional<Double> threshold,
- Optional<Double> bandwidth, Optional<List<String>> hosts)
- throws IOException {
- storageContainerLocationClient.startDiskBalancer(threshold, bandwidth,
- hosts);
+ public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+ Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+ Optional<List<String>> hosts) throws IOException {
+ return storageContainerLocationClient.startDiskBalancer(threshold,
+ bandwidthInMB, parallelThread, hosts);
}
@Override
- public void stopDiskBalancer(Optional<List<String>> hosts)
+ public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
throws IOException {
- storageContainerLocationClient.stopDiskBalancer(hosts);
+ return storageContainerLocationClient.stopDiskBalancer(hosts);
}
@Override
public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
- Optional<List<String>> hosts) throws IOException {
+ Optional<List<String>> hosts,
+ Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+ throws IOException {
return storageContainerLocationClient.getDiskBalancerStatus(hosts,
- ClientVersion.CURRENT_VERSION);
+ runningStatus, ClientVersion.CURRENT_VERSION);
}
@Override
- public void updateDiskBalancerConfiguration(Optional<Double> threshold,
- Optional<Double> bandwidth, Optional<List<String>> hosts)
+ public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+ Optional<Double> threshold, Optional<Long> bandwidth,
+ Optional<Integer> parallelThread, Optional<List<String>> hosts)
throws IOException {
- storageContainerLocationClient.updateDiskBalancerConfiguration(threshold,
- bandwidth, hosts);
+ return storageContainerLocationClient.updateDiskBalancerConfiguration(
+ threshold, bandwidth, parallelThread, hosts);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
index 267209cf03..32719f0d83 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -54,6 +55,7 @@ public class TestDiskBalancer {
private static ScmClient storageClient;
private static MiniOzoneCluster cluster;
private static OzoneConfiguration ozoneConf;
+ private static DiskBalancerManager diskBalancerManager;
@BeforeClass
public static void setup() throws Exception {
@@ -63,6 +65,8 @@ public class TestDiskBalancer {
cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
storageClient = new ContainerOperationClient(ozoneConf);
cluster.waitForClusterToBeReady();
+ diskBalancerManager = cluster.getStorageContainerManager()
+ .getDiskBalancerManager();
for (DatanodeDetails dn: cluster.getStorageContainerManager()
.getScmNodeManager().getAllNodes()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org