You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by yi...@apache.org on 2023/02/12 13:56:14 UTC

[ozone] 02/04: HDDS-7155. [DiskBalancer] Create interface between SCM and DN (#3701)

This is an automated email from the ASF dual-hosted git repository.

yiyang0203 pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit c4a4cc0990d0dc8ea64c2597a4de3449b51a9ab5
Author: Symious <yi...@foxmail.com>
AuthorDate: Wed Sep 7 12:19:57 2022 +0800

    HDDS-7155. [DiskBalancer] Create interface between SCM and DN (#3701)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   4 +
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |  21 ++--
 .../protocol/StorageContainerLocationProtocol.java |  17 ++--
 .../scm/storage/DiskBalancerConfiguration.java     |  39 +++----
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 .../common/report/DiskBalancerReportPublisher.java |  66 ++++++++++++
 .../common/report/ReportPublisherFactory.java      |   3 +
 .../common/statemachine/DatanodeStateMachine.java  |   2 +
 .../common/statemachine/StateContext.java          |  13 +++
 .../commandhandler/DiskBalancerCommandHandler.java | 112 +++++++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |   7 ++
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   6 ++
 .../protocol/commands/DiskBalancerCommand.java     |  74 ++++++++++++++
 .../common/report/TestReportPublisher.java         |  42 ++++++++
 .../common/report/TestReportPublisherFactory.java  |  12 +++
 ...inerLocationProtocolClientSideTranslatorPB.java |  81 +++++++++++++--
 .../src/main/proto/ScmAdminProtocol.proto          |  24 ++++-
 .../interface-client/src/main/proto/hdds.proto     |  10 +-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  17 ++++
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   9 ++
 .../hadoop/hdds/scm/node/DiskBalancerManager.java  | 111 ++++++++++++++------
 .../hdds/scm/node/DiskBalancerReportHandler.java   |  65 ++++++++++++
 .../hadoop/hdds/scm/node/DiskBalancerStatus.java   |   2 -
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |   8 ++
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  17 ++++
 ...inerLocationProtocolServerSideTranslatorPB.java |  52 +++++++++-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  22 ++--
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  23 +++++
 .../hdds/scm/server/StorageContainerManager.java   |   6 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  51 +++++++++-
 .../hdds/scm/container/SimpleMockNodeManager.java  |   5 +
 .../hdds/scm/node/TestDiskBalancerManager.java     |  34 +++++++
 .../testutils/ReplicationNodeManagerMock.java      |   6 ++
 .../hdds/scm/cli/ContainerOperationClient.java     |  29 +++---
 .../hadoop/ozone/scm/node/TestDiskBalancer.java    |   4 +
 35 files changed, 902 insertions(+), 100 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index cb258dfa74..7b3a9c0dd7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -46,6 +46,10 @@ public final class HddsConfigKeys {
       "hdds.pipeline.report.interval";
   public static final String HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT =
       "60s";
+  public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL =
+      "hdds.disk.balancer.report.interval";
+  public static final String HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT =
+      "60s";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL =
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 76c07d1c1b..98e6e26e87 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -400,7 +400,8 @@ public interface ScmClient extends Closeable {
       throws IOException;
 
   /**
-   * Get DiskBalancer status.
+   * Get DiskBalancer report.
+   * REPORT shows the current volume density of datanodes.
    * @param count top datanodes that need balancing
    * @return List of DatanodeDiskBalancerInfo.
    * @throws IOException
@@ -410,33 +411,39 @@ public interface ScmClient extends Closeable {
 
   /**
    * Get DiskBalancer status.
+   * STATUS shows the running status of DiskBalancer on datanodes.
    * @param hosts If hosts is not null, return status of hosts; If hosts is
    *              null, return status of all datanodes in balancing.
    * @return List of DatanodeDiskBalancerInfo.
    * @throws IOException
    */
   List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts) throws IOException;
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+      throws IOException;
 
   /**
    * Start DiskBalancer.
    */
-  void startDiskBalancer(
+  List<DatanodeAdminError> startDiskBalancer(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 
   /**
    * Stop DiskBalancer.
    */
-  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+  List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException;
 
 
   /**
    * Update DiskBalancer Configuration.
    */
-  void updateDiskBalancerConfiguration(
+  List<DatanodeAdminError> updateDiskBalancerConfiguration(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidth,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 56ed8c7086..afdf36a5f5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -441,26 +441,31 @@ public interface StorageContainerLocationProtocol extends Closeable {
    * Get DiskBalancer status.
    */
   List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException;
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus,
+      int clientVersion) throws IOException;
 
   /**
    * Start DiskBalancer.
    */
-  void startDiskBalancer(
+  List<DatanodeAdminError> startDiskBalancer(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 
   /**
    * Stop DiskBalancer.
    */
-  void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+  List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
+      throws IOException;
 
   /**
    * Update DiskBalancer Configuration.
    */
-  void updateDiskBalancerConfiguration(
+  List<DatanodeAdminError> updateDiskBalancerConfiguration(
       Optional<Double> threshold,
-      Optional<Double> bandwidth,
+      Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread,
       Optional<List<String>> hosts) throws IOException;
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
index 704b383679..43417f2e81 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
@@ -36,7 +36,7 @@ public final class DiskBalancerConfiguration {
   private static final Logger LOG =
       LoggerFactory.getLogger(DiskBalancerConfiguration.class);
 
-  @Config(key = "volume.density.threshold", type = ConfigType.AUTO,
+  @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE,
       defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
       description = "Threshold is a percentage in the range of 0 to 100. A " +
           "datanode is considered balanced if for each volume, the " +
@@ -45,12 +45,12 @@ public final class DiskBalancerConfiguration {
           " of the entire datanode) no more than the threshold.")
   private double threshold = 10d;
 
-  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG,
       defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
       description = "The max balance speed.")
-  private double diskBandwidth = 10;
+  private long diskBandwidthInMB = 10;
 
-  @Config(key = "parallel.thread", type = ConfigType.AUTO,
+  @Config(key = "parallel.thread", type = ConfigType.INT,
       defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
       description = "The max parallel balance thread count.")
   private int parallelThread = 5;
@@ -86,21 +86,21 @@ public final class DiskBalancerConfiguration {
    *
    * @return max disk bandwidth per second
    */
-  public double getDiskBandwidth() {
-    return diskBandwidth;
+  public double getDiskBandwidthInMB() {
+    return diskBandwidthInMB;
   }
 
   /**
    * Sets the disk bandwidth value for Disk Balancer.
    *
-   * @param diskBandwidth the bandwidth to control balance speed
+   * @param diskBandwidthInMB the bandwidth to control balance speed
    */
-  public void setDiskBandwidth(double diskBandwidth) {
-    if (diskBandwidth <= 0d) {
+  public void setDiskBandwidthInMB(long diskBandwidthInMB) {
+    if (diskBandwidthInMB <= 0L) {
       throw new IllegalArgumentException(
-          "diskBandwidth must be a value larger than 0.");
+          "diskBandwidthInMB must be a value larger than 0.");
     }
-    this.diskBandwidth = diskBandwidth;
+    this.diskBandwidthInMB = diskBandwidthInMB;
   }
 
   /**
@@ -124,6 +124,7 @@ public final class DiskBalancerConfiguration {
     }
     this.parallelThread = parallelThread;
   }
+
   @Override
   public String toString() {
     return String.format("Disk Balancer Configuration values:%n" +
@@ -132,7 +133,7 @@ public final class DiskBalancerConfiguration {
             "%-50s %s%n" +
             "%-50s %s%n",
             "Key", "Value",
-        "Threshold", threshold, "Max disk bandwidth", diskBandwidth,
+        "Threshold", threshold, "Max disk bandwidth", diskBandwidthInMB,
         "Parallel Thread", parallelThread);
   }
 
@@ -141,21 +142,21 @@ public final class DiskBalancerConfiguration {
         HddsProtos.DiskBalancerConfigurationProto.newBuilder();
 
     builder.setThreshold(threshold)
-        .setDiskBandwidth(diskBandwidth)
+        .setDiskBandwidthInMB(diskBandwidthInMB)
         .setParallelThread(parallelThread);
     return builder;
   }
 
-  static DiskBalancerConfiguration fromProtobuf(
+  public static DiskBalancerConfiguration fromProtobuf(
       @NotNull HddsProtos.DiskBalancerConfigurationProto proto,
-      @NotNull OzoneConfiguration ozoneConfiguration) {
+      @NotNull ConfigurationSource configurationSource) {
     DiskBalancerConfiguration config =
-        ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
+        configurationSource.getObject(DiskBalancerConfiguration.class);
     if (proto.hasThreshold()) {
       config.setThreshold(proto.getThreshold());
     }
-    if (proto.hasDiskBandwidth()) {
-      config.setDiskBandwidth(proto.getDiskBandwidth());
+    if (proto.hasDiskBandwidthInMB()) {
+      config.setDiskBandwidthInMB(proto.getDiskBandwidthInMB());
     }
     if (proto.hasParallelThread()) {
       config.setParallelThread(proto.getParallelThread());
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c6adf0c2db..8e3c78b35b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -365,6 +365,14 @@
       datanode periodically send pipeline report to SCM. Unit could be
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
+  <property>
+    <name>hdds.disk.balancer.report.interval</name>
+    <value>60000ms</value>
+    <tag>OZONE, CONTAINER, DISK_BALANCER</tag>
+    <description>Time interval of the datanode to send disk balancer report. Each
+      datanode periodically sends disk balancer report to SCM. Unit could be
+      defined with postfix (ns,ms,s,m,h,d)</description>
+  </property>
 
 
   <property>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
new file mode 100644
index 0000000000..2bb78c1ee6
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/DiskBalancerReportPublisher.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.report;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT;
+
+
+/**
+ * Publishes DiskBalancer report which will be sent to SCM as part of heartbeat.
+ * DiskBalancer Report consist of the following information:
+ *   - isBalancerRunning
+ *   - balancedBytes
+ *   - DiskBalancerConfiguration
+ */
+public class DiskBalancerReportPublisher extends
+    ReportPublisher<DiskBalancerReportProto> {
+
+  private Long diskBalancerReportInterval = null;
+
+  @Override
+  protected long getReportFrequency() {
+    if (diskBalancerReportInterval == null) {
+      diskBalancerReportInterval = getConf().getTimeDuration(
+          HDDS_DISK_BALANCER_REPORT_INTERVAL,
+          HDDS_DISK_BALANCER_REPORT_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+
+      long heartbeatFrequency = HddsServerUtil.getScmHeartbeatInterval(
+          getConf());
+
+      Preconditions.checkState(
+          heartbeatFrequency <= diskBalancerReportInterval,
+              HDDS_DISK_BALANCER_REPORT_INTERVAL +
+              " cannot be configured lower than heartbeat frequency " +
+                  heartbeatFrequency + ".");
+    }
+    return diskBalancerReportInterval;
+  }
+
+  @Override
+  protected DiskBalancerReportProto getReport() {
+    return getContext().getParent().getContainer().getDiskBalancerReport();
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
index 3be1b5e077..19d2806919 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 
@@ -55,6 +56,8 @@ public class ReportPublisherFactory {
     report2publisher.put(PipelineReportsProto.class,
             PipelineReportPublisher.class);
     report2publisher.put(CRLStatusReport.class, CRLStatusReportPublisher.class);
+    report2publisher.put(DiskBalancerReportProto.class,
+        DiskBalancerReportPublisher.class);
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 01d114245a..86d92122cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Comm
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CreatePipelineCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DiskBalancerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
@@ -228,6 +229,7 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new SetNodeOperationalStateCommandHandler(conf))
         .addHandler(new FinalizeNewLayoutVersionCommandHandler())
         .addHandler(new RefreshVolumeUsageCommandHandler())
+        .addHandler(new DiskBalancerCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index c4d3428a80..c75c7a455d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -98,6 +99,9 @@ public class StateContext {
   @VisibleForTesting
   static final String CRL_STATUS_REPORT_PROTO_NAME =
       CRLStatusReport.getDescriptor().getFullName();
+  @VisibleForTesting
+  static final String DISK_BALANCER_REPORT_PROTO_NAME =
+      DiskBalancerReportProto.getDescriptor().getFullName();
 
   static final Logger LOG =
       LoggerFactory.getLogger(StateContext.class);
@@ -113,6 +117,7 @@ public class StateContext {
   private final AtomicReference<Message> nodeReport;
   private final AtomicReference<Message> pipelineReports;
   private final AtomicReference<Message> crlStatusReport;
+  private final AtomicReference<Message> diskBalancerReport;
   // Incremental reports are queued in the map below
   private final Map<InetSocketAddress, List<Message>>
       incrementalReportsQueue;
@@ -172,6 +177,7 @@ public class StateContext {
     nodeReport = new AtomicReference<>();
     pipelineReports = new AtomicReference<>();
     crlStatusReport = new AtomicReference<>(); // Certificate Revocation List
+    diskBalancerReport = new AtomicReference<>();
     endpoints = new HashSet<>();
     containerActions = new HashMap<>();
     pipelineActions = new HashMap<>();
@@ -197,6 +203,8 @@ public class StateContext {
     type2Reports.put(PIPELINE_REPORTS_PROTO_NAME, pipelineReports);
     fullReportTypeList.add(CRL_STATUS_REPORT_PROTO_NAME);
     type2Reports.put(CRL_STATUS_REPORT_PROTO_NAME, crlStatusReport);
+    fullReportTypeList.add(DISK_BALANCER_REPORT_PROTO_NAME);
+    type2Reports.put(DISK_BALANCER_REPORT_PROTO_NAME, diskBalancerReport);
   }
 
   /**
@@ -929,6 +937,11 @@ public class StateContext {
     return crlStatusReport.get();
   }
 
+  @VisibleForTesting
+  public Message getDiskBalancerReport() {
+    return diskBalancerReport.get();
+  }
+
   public void configureReconHeartbeatFrequency() {
     reconHeartbeatFrequency.set(getReconHeartbeatInterval(conf));
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
new file mode 100644
index 0000000000..705c0af0e1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for DiskBalancer command received from SCM.
+ */
+public class DiskBalancerCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a diskBalancerCommand handler.
+   */
+  public DiskBalancerCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    // TODO: Do start/stop/update operation
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.diskBalancerCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+
+  @Override
+  public int getQueuedCount() {
+    return 0;
+  }
+
+  private void startDiskBalancer(DiskBalancerConfiguration configuration) {
+    // Todo: add implementation to start DiskBalancer
+  }
+
+  private void stopDiskBalancer() {
+    // Todo: add implementation to stop DiskBalancer
+  }
+
+  private void updateDiskBalancer(DiskBalancerConfiguration
+      configuration) {
+    // Todo: add implementation to update diskBalancer configuration
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 3a1bd8ffb3..ac5a14b6aa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
 import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
@@ -418,6 +419,12 @@ public class HeartbeatEndpointTask
             commandResponseProto.getRefreshVolumeUsageCommandProto());
         processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
         break;
+      case diskBalancerCommand:
+        DiskBalancerCommand diskBalancerCommand =
+            DiskBalancerCommand.getFromProtobuf(
+                commandResponseProto.getDiskBalancerCommandProto(), conf);
+        processCommonCommand(commandResponseProto, diskBalancerCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index b3db557b6f..14a89bb867 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -521,4 +522,9 @@ public class OzoneContainer {
   StorageVolumeChecker getVolumeChecker(ConfigurationSource conf) {
     return new StorageVolumeChecker(conf, new Timer());
   }
+
+  public DiskBalancerReportProto getDiskBalancerReport() {
+    // TODO: Return real disk balancer report
+    return null;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
new file mode 100644
index 0000000000..ea97f34ca0
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DiskBalancerCommand.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerCommandProto;
+
+/**
+ * Informs a datanode to update DiskBalancer status.
+ */
+public class DiskBalancerCommand extends SCMCommand<DiskBalancerCommandProto> {
+
+  private final boolean shouldRun;
+  private final DiskBalancerConfiguration diskBalancerConfiguration;
+
+  public DiskBalancerCommand(final boolean shouldRun,
+      final DiskBalancerConfiguration diskBalancerConfiguration) {
+    this.shouldRun = shouldRun;
+    this.diskBalancerConfiguration = diskBalancerConfiguration;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.diskBalancerCommand;
+  }
+
+  @Override
+  public DiskBalancerCommandProto getProto() {
+    return DiskBalancerCommandProto.newBuilder()
+        .setShouldRun(shouldRun)
+        .setDiskBalancerConf(diskBalancerConfiguration.toProtobufBuilder())
+        .build();
+  }
+
+  public static DiskBalancerCommand getFromProtobuf(DiskBalancerCommandProto
+      diskbalancerCommandProto, ConfigurationSource configuration) {
+    Preconditions.checkNotNull(diskbalancerCommandProto);
+    return new DiskBalancerCommand(diskbalancerCommandProto.getShouldRun(),
+        DiskBalancerConfiguration.fromProtobuf(
+            diskbalancerCommandProto.getDiskBalancerConf(), configuration));
+  }
+
+  public boolean isShouldRun() {
+    return shouldRun;
+  }
+
+  public DiskBalancerConfiguration getDiskBalancerConfiguration() {
+    return diskBalancerConfiguration;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index d611caf126..c09261d4ab 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hdds.HddsIdFactory;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.hdds.protocol.proto.
@@ -39,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.jupiter.api.Assertions;
@@ -215,6 +218,45 @@ public class TestReportPublisher {
     executorService.shutdown();
   }
 
+  @Test
+  public void testDiskBalancerReportPublisher() throws IOException {
+    StateContext dummyContext = Mockito.mock(StateContext.class);
+    DatanodeStateMachine dummyStateMachine =
+        Mockito.mock(DatanodeStateMachine.class);
+    OzoneContainer dummyContainer = Mockito.mock(OzoneContainer.class);
+
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    builder.setIsRunning(true);
+    builder.setBalancedBytes(1L);
+    builder.setDiskBalancerConf(
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder().build());
+    DiskBalancerReportProto dummyReport = builder.build();
+
+    ReportPublisher publisher = new DiskBalancerReportPublisher();
+    when(dummyContext.getParent()).thenReturn(dummyStateMachine);
+    when(dummyStateMachine.getContainer()).thenReturn(dummyContainer);
+    when(dummyContainer.getDiskBalancerReport()).thenReturn(dummyReport);
+    publisher.setConf(config);
+
+    ScheduledExecutorService executorService = HadoopExecutors
+        .newScheduledThreadPool(1,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Unit test ReportManager Thread - %d").build());
+    publisher.init(dummyContext, executorService);
+    Message report =
+        ((DiskBalancerReportPublisher) publisher).getReport();
+    Assert.assertNotNull(report);
+    for (Descriptors.FieldDescriptor descriptor :
+        report.getDescriptorForType().getFields()) {
+      if (descriptor.getNumber() ==
+          DiskBalancerReportProto.ISRUNNING_FIELD_NUMBER) {
+        Assert.assertEquals(true, report.getField(descriptor));
+      }
+    }
+    executorService.shutdown();
+  }
+
   /**
    * Get a datanode details.
    *
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
index ea915f98a2..0b43bb93f9 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -66,6 +67,17 @@ public class TestReportPublisherFactory {
     Assert.assertEquals(conf, publisher.getConf());
   }
 
+  @Test
+  public void testGetDiskBalancerReportPublisher() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ReportPublisherFactory factory = new ReportPublisherFactory(conf);
+    ReportPublisher publisher = factory
+        .getPublisherFor(DiskBalancerReportProto.class);
+    Assert.assertEquals(DiskBalancerReportPublisher.class,
+        publisher.getClass());
+    Assert.assertEquals(conf, publisher.getConf());
+  }
+
   @Test
   public void testInvalidReportPublisher() {
     OzoneConfiguration conf = new OzoneConfiguration();
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 0d35798352..936e31d760 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -45,6 +45,9 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
@@ -1052,11 +1055,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
 
   @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException {
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> status,
+      int clientVersion) throws IOException {
     DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
         DatanodeDiskBalancerInfoRequestProto.newBuilder()
             .setInfoType(DatanodeDiskBalancerInfoType.status);
     hosts.ifPresent(requestBuilder::addAllHosts);
+    status.ifPresent(requestBuilder::setStatus);
     DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
 
     DatanodeDiskBalancerInfoResponseProto response =
@@ -1068,22 +1074,83 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   @Override
-  public void startDiskBalancer(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
-      throws IOException {
+  public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+      Optional<List<String>> hosts) throws IOException {
+    HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+    threshold.ifPresent(confBuilder::setThreshold);
+    bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
+    parallelThread.ifPresent(confBuilder::setParallelThread);
+
+    DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerOpRequestProto.newBuilder()
+            .setOpType(DatanodeDiskBalancerOpType.start)
+            .setConf(confBuilder);
+    hosts.ifPresent(requestBuilder::addAllHosts);
 
+    DatanodeDiskBalancerOpResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerOp,
+            builder -> builder.setDatanodeDiskBalancerOpRequest(
+                requestBuilder.build()))
+            .getDatanodeDiskBalancerOpResponse();
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+      errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+    }
+    return errors;
   }
 
   @Override
-  public void stopDiskBalancer(Optional<List<String>> hosts)
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
       throws IOException {
+    DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerOpRequestProto.newBuilder()
+            .setOpType(DatanodeDiskBalancerOpType.stop);
+    hosts.ifPresent(requestBuilder::addAllHosts);
+
+    DatanodeDiskBalancerOpResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerOp,
+            builder -> builder.setDatanodeDiskBalancerOpRequest(
+                requestBuilder.build()))
+            .getDatanodeDiskBalancerOpResponse();
 
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+      errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+    }
+    return errors;
   }
 
   @Override
-  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
+    HddsProtos.DiskBalancerConfigurationProto.Builder confBuilder =
+        HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+    threshold.ifPresent(confBuilder::setThreshold);
+    bandwidthInMB.ifPresent(confBuilder::setDiskBandwidthInMB);
+    parallelThread.ifPresent(confBuilder::setParallelThread);
+
+    DatanodeDiskBalancerOpRequestProto.Builder requestBuilder =
+        DatanodeDiskBalancerOpRequestProto.newBuilder()
+            .setOpType(DatanodeDiskBalancerOpType.update)
+            .setConf(confBuilder);
+    hosts.ifPresent(requestBuilder::addAllHosts);
+
+    DatanodeDiskBalancerOpResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerOp,
+            builder -> builder.setDatanodeDiskBalancerOpRequest(
+                requestBuilder.build()))
+            .getDatanodeDiskBalancerOpResponse();
+
+    List<DatanodeAdminError> errors = new ArrayList<>();
+    for (DatanodeAdminErrorResponseProto e : response.getFailedHostsList()) {
+      errors.add(new DatanodeAdminError(e.getHost(), e.getError()));
+    }
+    return errors;
   }
 
   @Override
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 766db3429c..d6c552c4b4 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -78,7 +78,8 @@ message ScmContainerLocationRequest {
   optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39;
   optional ReplicationManagerReportRequestProto replicationManagerReportRequest = 40;
   optional ResetDeletedBlockRetryCountRequestProto resetDeletedBlockRetryCountRequest = 41;
-  optional DatanodeDiskBalancerInfoRequestProto DatanodeDiskBalancerInfoRequest = 42;
+  optional DatanodeDiskBalancerInfoRequestProto datanodeDiskBalancerInfoRequest = 42;
+  optional DatanodeDiskBalancerOpRequestProto datanodeDiskBalancerOpRequest = 43;
 }
 
 message ScmContainerLocationResponse {
@@ -128,7 +129,8 @@ message ScmContainerLocationResponse {
   optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39;
   optional ReplicationManagerReportResponseProto getReplicationManagerReportResponse = 40;
   optional ResetDeletedBlockRetryCountResponseProto resetDeletedBlockRetryCountResponse = 41;
-  optional DatanodeDiskBalancerInfoResponseProto DatanodeDiskBalancerInfoResponse = 42;
+  optional DatanodeDiskBalancerInfoResponseProto datanodeDiskBalancerInfoResponse = 42;
+  optional DatanodeDiskBalancerOpResponseProto datanodeDiskBalancerOpResponse = 43;
 
   enum Status {
     OK = 1;
@@ -178,6 +180,7 @@ enum Type {
   ResetDeletedBlockRetryCount = 36;
   GetClosedContainerCount = 37;
   DatanodeDiskBalancerInfo= 38;
+  DatanodeDiskBalancerOp = 39;
 }
 
 /**
@@ -341,12 +344,29 @@ message DatanodeDiskBalancerInfoRequestProto {
   required DatanodeDiskBalancerInfoType infoType = 1;
   optional uint32 count = 2;
   repeated string hosts = 3;
+  optional DiskBalancerRunningStatus status = 4;
 }
 
 message DatanodeDiskBalancerInfoResponseProto {
   repeated DatanodeDiskBalancerInfoProto info = 1;
 }
 
+enum DatanodeDiskBalancerOpType{
+  start = 1;
+  stop = 2;
+  update = 3;
+}
+
+message DatanodeDiskBalancerOpRequestProto {
+  required DatanodeDiskBalancerOpType opType = 1;
+  repeated string hosts = 2;
+  optional DiskBalancerConfigurationProto conf = 3;
+}
+
+message DatanodeDiskBalancerOpResponseProto {
+  repeated DatanodeAdminErrorResponseProto failedHosts = 1;
+}
+
 /*
   Decommission a list of hosts
 */
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 7cee2f6160..231e105aea 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -455,13 +455,19 @@ message ContainerBalancerConfigurationProto {
 
 message DiskBalancerConfigurationProto {
     optional double threshold = 1;
-    optional double diskBandwidth = 2;
+    optional uint64 diskBandwidthInMB = 2;
     optional int32 parallelThread = 3;
 }
 
+enum DiskBalancerRunningStatus {
+    RUNNING = 1;
+    STOPPED = 2;
+    UNKNOWN = 3;
+}
+
 message DatanodeDiskBalancerInfoProto {
     required DatanodeDetailsProto node = 1;
     required double currentVolumeDensitySum = 2;
-    optional bool diskBalancerRunning = 3;
+    optional DiskBalancerRunningStatus runningStatus = 3;
     optional DiskBalancerConfigurationProto diskBalancerConf = 4;
 }
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 665665ee38..81e9ee54d3 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -139,6 +139,7 @@ message SCMHeartbeatRequestProto {
   optional PipelineReportsProto pipelineReports = 8;
   optional LayoutVersionProto dataNodeLayoutVersion = 9;
   optional CommandQueueReportProto commandQueueReport = 10;
+  optional DiskBalancerReportProto diskBalancerReport = 11;
 }
 
 message CommandQueueReportProto {
@@ -325,6 +326,7 @@ message SCMCommandProto {
     finalizeNewLayoutVersionCommand = 9;
     refreshVolumeUsageInfo = 10;
     reconstructECContainersCommand = 11;
+    diskBalancerCommand = 12;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -340,6 +342,7 @@ message SCMCommandProto {
   finalizeNewLayoutVersionCommandProto = 10;
   optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11;
   optional ReconstructECContainersCommandProto reconstructECContainersCommandProto = 12;
+  optional DiskBalancerCommandProto diskBalancerCommandProto = 13;
 
 
   // If running upon Ratis, holds term of underlying RaftServer iff current
@@ -442,6 +445,14 @@ message DatanodeDetailsAndReplicaIndexProto {
     required int32 replicaIndex = 2;
 }
 
+/**
+This command asks the datanode to update diskBalancer status
+ */
+message DiskBalancerCommandProto {
+  required bool shouldRun = 1;
+  optional DiskBalancerConfigurationProto diskBalancerConf = 2;
+}
+
 /**
 This command asks the datanode to create a pipeline.
 */
@@ -480,6 +491,12 @@ message CRLStatusReport {
   repeated int64 pendingCrlIds=2;
 }
 
+message DiskBalancerReportProto {
+  required bool isRunning = 1;
+  optional uint64 balancedBytes = 2;
+  optional DiskBalancerConfigurationProto diskBalancerConf = 3;
+}
+
 /**
  * This command asks the datanode to process a new CRL.
  */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 639c884795..16c995e1eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CRLStatu
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.CommandStatusReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -227,6 +228,14 @@ public final class SCMEvents {
       new TypedEvent<>(CRLStatusReportFromDatanode.class,
           "Crl_Status_Report");
 
+  /**
+   * DiskBalancer reports are send out by Datanodes. This report is received by
+   * SCMDatanodeHeartbeatDispatcher and DiskBalancer_Report Event is generated.
+   */
+  public static final TypedEvent<DiskBalancerReportFromDatanode>
+      DISK_BALANCER_REPORT = new TypedEvent<>(
+          DiskBalancerReportFromDatanode.class, "DiskBalancer_Report");
+
   /**
    * Private Ctor. Never Constructed.
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 5ca66e9eef..c6114c52f5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -22,7 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -47,10 +48,13 @@ public class DiskBalancerManager {
   public static final Logger LOG =
       LoggerFactory.getLogger(DiskBalancerManager.class);
 
+
+  private OzoneConfiguration conf;
   private final EventPublisher scmNodeEventPublisher;
   private final SCMContext scmContext;
   private final NodeManager nodeManager;
   private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
   private boolean useHostnames;
 
   /**
@@ -60,6 +64,7 @@ public class DiskBalancerManager {
                         EventPublisher eventPublisher,
                         SCMContext scmContext,
                         NodeManager nodeManager) {
+    this.conf = conf;
     this.scmNodeEventPublisher = eventPublisher;
     this.scmContext = scmContext;
     this.nodeManager = nodeManager;
@@ -95,42 +100,60 @@ public class DiskBalancerManager {
    * If hosts is null, return status of all datanodes in balancing.
    */
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException {
-    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
-        new ArrayList<>();
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> status,
+      int clientVersion) throws IOException {
     List<DatanodeDetails> filterDns = null;
     if (hosts.isPresent() && !hosts.get().isEmpty()) {
       filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
           useHostnames);
     }
 
-    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
-        HddsProtos.NodeState.HEALTHY)) {
-      if (shouldReturnDatanode(filterDns, datanodeDetails)) {
-        double volumeDensitySum =
-            getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
-        statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
-            .setCurrentVolumeDensitySum(volumeDensitySum)
-            .setDiskBalancerRunning(isRunning(datanodeDetails))
-            .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
-                    DiskBalancerStatus.DUMMY_STATUS)
-                .getDiskBalancerConfiguration().toProtobufBuilder())
-            .setNode(datanodeDetails.toProto(clientVersion))
-            .build());
-      }
+    // Filter Running Status by default
+    HddsProtos.DiskBalancerRunningStatus filterStatus = status.orElse(null);
+
+    if (filterDns != null) {
+      return filterDns.stream()
+          .filter(dn -> shouldReturnDatanode(filterStatus, dn))
+          .map(nodeManager::getDatanodeInfo)
+          .map(dn -> getInfoProto(dn, clientVersion))
+          .collect(Collectors.toList());
+    } else {
+      return nodeManager.getAllNodes().stream()
+          .filter(dn -> shouldReturnDatanode(filterStatus, dn))
+          .map(dn -> getInfoProto((DatanodeInfo)dn, clientVersion))
+          .collect(Collectors.toList());
     }
-    return statusList;
   }
 
-  private boolean shouldReturnDatanode(List<DatanodeDetails> hosts,
+  private boolean shouldReturnDatanode(
+      HddsProtos.DiskBalancerRunningStatus status,
       DatanodeDetails datanodeDetails) {
-    if (hosts == null || hosts.isEmpty()) {
-      return isRunning(datanodeDetails);
-    } else {
-      return hosts.contains(datanodeDetails);
+    boolean shouldReturn = true;
+    // If status specified, do not return if status not match.
+    if (status != null && getRunningStatus(datanodeDetails) != status) {
+      shouldReturn = false;
     }
+    return shouldReturn;
   }
 
+  private HddsProtos.DatanodeDiskBalancerInfoProto getInfoProto(
+      DatanodeInfo dn, int clientVersion) {
+    double volumeDensitySum =
+        getVolumeDataDensitySumForDatanodeDetails(dn);
+    HddsProtos.DiskBalancerRunningStatus runningStatus =
+        getRunningStatus(dn);
+    HddsProtos.DatanodeDiskBalancerInfoProto.Builder builder =
+        HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+            .setNode(dn.toProto(clientVersion))
+            .setCurrentVolumeDensitySum(volumeDensitySum)
+            .setRunningStatus(getRunningStatus(dn));
+    if (runningStatus != HddsProtos.DiskBalancerRunningStatus.UNKNOWN) {
+      builder.setDiskBalancerConf(statusMap.get(dn)
+          .getDiskBalancerConfiguration().toProtobufBuilder());
+    }
+    return builder.build();
+  }
   /**
    * Get volume density for a specific DatanodeDetails node.
    *
@@ -144,8 +167,7 @@ public class DiskBalancerManager {
     DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails;
 
     double totalCapacity = 0d, totalUsed = 0d;
-    for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto :
-        datanodeInfo.getStorageReports()) {
+    for (StorageReportProto reportProto : datanodeInfo.getStorageReports()) {
       totalCapacity += reportProto.getCapacity();
       totalUsed += reportProto.getScmUsed();
     }
@@ -162,10 +184,17 @@ public class DiskBalancerManager {
     return volumeDensitySum;
   }
 
-  private boolean isRunning(DatanodeDetails datanodeDetails) {
-    return statusMap
-        .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS)
-        .isRunning();
+  private HddsProtos.DiskBalancerRunningStatus getRunningStatus(
+      DatanodeDetails datanodeDetails) {
+    if (!statusMap.containsKey(datanodeDetails)) {
+      return HddsProtos.DiskBalancerRunningStatus.UNKNOWN;
+    } else {
+      if (statusMap.get(datanodeDetails).isRunning()) {
+        return HddsProtos.DiskBalancerRunningStatus.RUNNING;
+      } else {
+        return HddsProtos.DiskBalancerRunningStatus.STOPPED;
+      }
+    }
   }
 
   @VisibleForTesting
@@ -173,4 +202,24 @@ public class DiskBalancerManager {
     statusMap.put(datanodeDetails, new DiskBalancerStatus(true,
         new DiskBalancerConfiguration()));
   }
-}
\ No newline at end of file
+
+  public void processDiskBalancerReport(DiskBalancerReportProto reportProto,
+      DatanodeDetails dn) {
+    boolean isRunning = reportProto.getIsRunning();
+    DiskBalancerConfiguration diskBalancerConfiguration =
+        reportProto.hasDiskBalancerConf() ?
+            DiskBalancerConfiguration.fromProtobuf(
+                reportProto.getDiskBalancerConf(), conf) :
+            new DiskBalancerConfiguration();
+    statusMap.put(dn, new DiskBalancerStatus(isRunning,
+        diskBalancerConfiguration));
+    if (reportProto.hasBalancedBytes()) {
+      balancedBytesMap.put(dn, reportProto.getBalancedBytes());
+    }
+  }
+
+  @VisibleForTesting
+  public Map<DatanodeDetails, DiskBalancerStatus> getStatusMap() {
+    return statusMap;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
new file mode 100644
index 0000000000..47158972ec
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerReportHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles DiskBalancer Reports from datanode.
+ */
+public class DiskBalancerReportHandler implements
+    EventHandler<DiskBalancerReportFromDatanode> {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(DiskBalancerReportHandler.class);
+
+  private DiskBalancerManager diskBalancerManager;
+
+  public DiskBalancerReportHandler(DiskBalancerManager diskBalancerManager) {
+    this.diskBalancerManager = diskBalancerManager;
+  }
+
+  @Override
+  public void onMessage(DiskBalancerReportFromDatanode reportFromDatanode,
+      EventPublisher publisher) {
+    Preconditions.checkNotNull(reportFromDatanode);
+    DatanodeDetails dn = reportFromDatanode.getDatanodeDetails();
+    DiskBalancerReportProto diskBalancerReportProto =
+        reportFromDatanode.getReport();
+    Preconditions.checkNotNull(dn,
+        "DiskBalancer Report is missing DatanodeDetails.");
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Processing diskBalancer report for dn: {}", dn);
+    }
+    try {
+      diskBalancerManager.processDiskBalancerReport(
+          diskBalancerReportProto, dn);
+    } catch (Exception e) {
+      LOGGER.error("Failed to process diskBalancer report={} from dn={}.",
+          diskBalancerReportProto, dn, e);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
index ed22e80e3c..65f577bcee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -32,8 +32,6 @@ public class DiskBalancerStatus {
   private boolean isRunning;
   private DiskBalancerConfiguration diskBalancerConfiguration;
 
-  public static final DiskBalancerStatus DUMMY_STATUS =
-      new DiskBalancerStatus(false, new DiskBalancerConfiguration());
 
   public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf) {
     this.isRunning = isRunning;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index 8f72375bcd..c1dffe7f4b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -162,6 +162,14 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    */
   DatanodeUsageInfo getUsageInfo(DatanodeDetails dn);
 
+  /**
+   * Get the datanode info of a specified datanode.
+   *
+   * @param dn the usage of which we want to get
+   * @return DatanodeInfo of the specified datanode
+   */
+  DatanodeInfo getDatanodeInfo(DatanodeDetails dn);
+
   /**
    * Return the node stat of the specified datanode.
    * @param datanodeDetails DatanodeDetails.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 7a62088ba7..935bfbc6a8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -860,6 +860,23 @@ public class SCMNodeManager implements NodeManager {
     return usageInfo;
   }
 
+  /**
+   * Get the usage info of a specified datanode.
+   *
+   * @param dn the usage of which we want to get
+   * @return DatanodeUsageInfo of the specified datanode
+   */
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+    try {
+      return nodeStateManager.getNode(dn);
+    } catch (NodeNotFoundException e) {
+      LOG.warn("Cannot retrieve DatanodeInfo, datanode {} not found.",
+          dn.getUuid());
+      return null;
+    }
+  }
+
   /**
    * Return the node stat of the specified datanode.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 2f50456211..38a2503b55 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerOpResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
@@ -672,6 +673,13 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
               request.getDatanodeDiskBalancerInfoRequest(),
               request.getVersion()))
           .build();
+      case DatanodeDiskBalancerOp:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setDatanodeDiskBalancerOpResponse(getDatanodeDiskBalancerOp(
+                request.getDatanodeDiskBalancerOpRequest()))
+            .build();
       default:
         throw new IllegalArgumentException(
             "Unknown command type: " + request.getCmdType());
@@ -1183,6 +1191,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
             request.getTransactionIdList()))
         .build();
   }
+
   public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo(
       StorageContainerLocationProtocolProtos.
           DatanodeDiskBalancerInfoRequestProto request, int clientVersion)
@@ -1195,7 +1204,9 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
       break;
     case status:
       infoProtoList = impl.getDiskBalancerStatus(
-          Optional.of(request.getHostsList()), clientVersion);
+          Optional.of(request.getHostsList()),
+          Optional.of(request.getStatus()),
+          clientVersion);
       break;
     default:
       infoProtoList = null;
@@ -1204,4 +1215,43 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
         .addAllInfo(infoProtoList)
         .build();
   }
+
+  public DatanodeDiskBalancerOpResponseProto getDatanodeDiskBalancerOp(
+      StorageContainerLocationProtocolProtos.
+          DatanodeDiskBalancerOpRequestProto request)
+      throws IOException {
+    List<DatanodeAdminError> errors;
+    switch (request.getOpType()) {
+    case start:
+      errors = impl.startDiskBalancer(
+          Optional.of(request.getConf().getThreshold()),
+          Optional.of(request.getConf().getDiskBandwidthInMB()),
+          Optional.of(request.getConf().getParallelThread()),
+          Optional.of(request.getHostsList()));
+      break;
+    case update:
+      errors = impl.updateDiskBalancerConfiguration(
+          Optional.of(request.getConf().getThreshold()),
+          Optional.of(request.getConf().getDiskBandwidthInMB()),
+          Optional.of(request.getConf().getParallelThread()),
+          Optional.of(request.getHostsList()));
+      break;
+    case stop:
+      errors = impl.stopDiskBalancer(Optional.of(request.getHostsList()));
+      break;
+    default:
+      errors = new ArrayList<>();
+    }
+
+    DatanodeDiskBalancerOpResponseProto.Builder response =
+        DatanodeDiskBalancerOpResponseProto.newBuilder();
+    for (DatanodeAdminError e : errors) {
+      DatanodeAdminErrorResponseProto.Builder error =
+          DatanodeAdminErrorResponseProto.newBuilder();
+      error.setHost(e.getHostname());
+      error.setError(e.getError());
+      response.addFailedHosts(error);
+    }
+    return response.build();
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e9759da960..d3f23089be 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1142,7 +1142,9 @@ public class SCMClientProtocolServer implements
 
   @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts, int clientVersion) throws IOException {
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> status,
+      int clientVersion) throws IOException {
     // check admin authorisation
     try {
       getScm().checkAdminAccess(getRemoteUser());
@@ -1151,29 +1153,33 @@ public class SCMClientProtocolServer implements
       throw e;
     }
 
-    return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts,
+    return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts, status,
         clientVersion);
   }
 
   @Override
-  public void startDiskBalancer(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
-      throws IOException {
+  public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+      Optional<List<String>> hosts) throws IOException {
     // TODO: Send message to datanodes
+    return null;
   }
 
   @Override
-  public void stopDiskBalancer(Optional<List<String>> hosts)
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
       throws IOException {
     // TODO: Send message to datanodes
+    return null;
   }
 
 
   @Override
-  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidthInMB,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
     // TODO: Send message to datanodes
+    return null;
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 02e1b3fc61..35e15e4efd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CRLStatusReport;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
@@ -58,6 +59,7 @@ import java.util.UUID;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DISK_BALANCER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents
     .INCREMENTAL_CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
@@ -212,6 +214,15 @@ public final class SCMDatanodeHeartbeatDispatcher {
                   commandStatusReport));
         }
       }
+
+      if (heartbeat.hasDiskBalancerReport()) {
+        LOG.debug("Dispatching DiskBalancer Report.");
+        eventPublisher.fireEvent(
+            DISK_BALANCER_REPORT,
+            new DiskBalancerReportFromDatanode(
+                datanodeDetails,
+                heartbeat.getDiskBalancerReport()));
+      }
     }
 
     return commands;
@@ -445,4 +456,16 @@ public final class SCMDatanodeHeartbeatDispatcher {
       super(datanodeDetails, report);
     }
   }
+
+  /**
+   * DiskBalancer report event payload with origin.
+   */
+  public static class DiskBalancerReportFromDatanode
+      extends ReportFromDatanode<DiskBalancerReportProto> {
+
+    public DiskBalancerReportFromDatanode(DatanodeDetails datanodeDetails,
+        DiskBalancerReportProto report) {
+      super(datanodeDetails, report);
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b8727173f1..f4317ec9b2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerReportHandler;
 import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
 import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl;
@@ -454,6 +455,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new PipelineActionHandler(pipelineManager, scmContext, configuration);
     CRLStatusReportHandler crlStatusReportHandler =
         new CRLStatusReportHandler(certificateStore, configuration);
+    DiskBalancerReportHandler diskBalancerReportHandler =
+        new DiskBalancerReportHandler(diskBalancerManager);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
@@ -528,7 +531,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
     eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
-
+    eventQueue.addHandler(SCMEvents.DISK_BALANCER_REPORT,
+        diskBalancerReportHandler);
   }
 
   private void initializeCertificateClient() {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index e1eaf251f5..9cb923c078 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -346,7 +346,36 @@ public class MockNodeManager implements NodeManager {
    */
   @Override
   public List<DatanodeDetails> getAllNodes() {
-    return new ArrayList<>(nodeMetricMap.keySet());
+    // mock storage reports for TestDiskBalancer
+    List<DatanodeDetails> healthyNodesWithInfo = new ArrayList<>();
+    for (Map.Entry<DatanodeDetails, SCMNodeStat> entry:
+        nodeMetricMap.entrySet()) {
+      NodeStatus nodeStatus = NodeStatus.inServiceHealthy();
+      if (staleNodes.contains(entry.getKey())) {
+        nodeStatus = NodeStatus.inServiceStale();
+      } else if (deadNodes.contains(entry.getKey())) {
+        nodeStatus = NodeStatus.inServiceDead();
+      }
+      DatanodeInfo di = new DatanodeInfo(entry.getKey(), nodeStatus,
+          UpgradeUtils.defaultLayoutVersionProto());
+
+      long capacity = entry.getValue().getCapacity().get();
+      long used = entry.getValue().getScmUsed().get();
+      long remaining = entry.getValue().getRemaining().get();
+      StorageReportProto storage1 = HddsTestUtils.createStorageReport(
+          di.getUuid(), "/data1-" + di.getUuidString(),
+          capacity, used, remaining, null);
+      MetadataStorageReportProto metaStorage1 =
+          HddsTestUtils.createMetadataStorageReport(
+              "/metadata1-" + di.getUuidString(), capacity, used,
+              remaining, null);
+      di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1)));
+      di.updateMetaDataStorageReports(
+          new ArrayList<>(Arrays.asList(metaStorage1)));
+
+      healthyNodesWithInfo.add(di);
+    }
+    return healthyNodesWithInfo;
   }
 
   /**
@@ -409,6 +438,26 @@ public class MockNodeManager implements NodeManager {
     return new DatanodeUsageInfo(datanodeDetails, stat);
   }
 
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dd) {
+    DatanodeInfo di = new DatanodeInfo(dd, NodeStatus.inServiceHealthy(),
+        UpgradeUtils.defaultLayoutVersionProto());
+    long capacity = nodeMetricMap.get(dd).getCapacity().get();
+    long used = nodeMetricMap.get(dd).getScmUsed().get();
+    long remaining = nodeMetricMap.get(dd).getRemaining().get();
+    StorageReportProto storage1 = HddsTestUtils.createStorageReport(
+        di.getUuid(), "/data1-" + di.getUuidString(),
+        capacity, used, remaining, null);
+    MetadataStorageReportProto metaStorage1 =
+        HddsTestUtils.createMetadataStorageReport(
+            "/metadata1-" + di.getUuidString(), capacity, used,
+            remaining, null);
+    di.updateStorageReports(new ArrayList<>(Arrays.asList(storage1)));
+    di.updateMetaDataStorageReports(
+        new ArrayList<>(Arrays.asList(metaStorage1)));
+    return di;
+  }
+
   /**
    * Return the node stat of the specified datanode.
    * @param datanodeDetails - datanode details.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index 22e01b9770..16f013b63f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -244,6 +244,11 @@ public class SimpleMockNodeManager implements NodeManager {
     return null;
   }
 
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+    return null;
+  }
+
   @Override
   public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
     return null;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
index 541c9764b7..7005dea292 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -21,8 +21,10 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.DiskBalancerReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.ozone.test.GenericTestUtils;
@@ -33,6 +35,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -46,6 +49,8 @@ public class TestDiskBalancerManager {
   private NodeManager nodeManager;
   private OzoneConfiguration conf;
   private String storageDir;
+  private DiskBalancerReportHandler diskBalancerReportHandler;
+  private Random random;
 
   @BeforeEach
   public void setup() throws Exception {
@@ -56,6 +61,9 @@ public class TestDiskBalancerManager {
     nodeManager = new MockNodeManager(true, 3);
     diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
         SCMContext.emptyContext(), nodeManager);
+    diskBalancerReportHandler =
+        new DiskBalancerReportHandler(diskBalancerManager);
+    random = new Random();
   }
 
   @Test
@@ -73,6 +81,7 @@ public class TestDiskBalancerManager {
   @Test
   public void testDatanodeDiskBalancerStatus() throws IOException {
     diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0));
+    diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(1));
 
     // Simulate users asking all status of 3 datanodes
     List<String> dns = nodeManager.getAllNodes().stream().map(
@@ -81,6 +90,7 @@ public class TestDiskBalancerManager {
 
     List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
         diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            Optional.empty(),
             ClientVersion.CURRENT_VERSION);
 
     Assertions.assertEquals(3, statusProtoList.size());
@@ -92,8 +102,32 @@ public class TestDiskBalancerManager {
 
     statusProtoList =
         diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+            Optional.empty(),
             ClientVersion.CURRENT_VERSION);
 
     Assertions.assertEquals(1, statusProtoList.size());
   }
+
+  @Test
+  public void testHandleDiskBalancerReportFromDatanode() {
+    for (DatanodeDetails dn: nodeManager.getAllNodes()) {
+      diskBalancerReportHandler.onMessage(
+          new DiskBalancerReportFromDatanode(dn, generateRandomReport()), null);
+    }
+
+    Assertions.assertEquals(3, diskBalancerManager.getStatusMap().size());
+  }
+
+  private DiskBalancerReportProto generateRandomReport() {
+    return DiskBalancerReportProto.newBuilder()
+        .setIsRunning(random.nextBoolean())
+        .setBalancedBytes(random.nextInt(10000))
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(random.nextInt(99))
+                .setParallelThread(random.nextInt(4) + 1)
+                .setDiskBandwidthInMB(random.nextInt(99) + 1)
+                .build())
+        .build();
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 436bccb09d..2be5da3655 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.DatanodeUsageInfo;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -189,6 +190,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
     return null;
   }
 
+  @Override
+  public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
+    return null;
+  }
+
   /**
    * Return the node stat of the specified datanode.
    *
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 6eb9c587b9..bc25a41795 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -505,31 +505,34 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   @Override
-  public void startDiskBalancer(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
-      throws IOException {
-    storageContainerLocationClient.startDiskBalancer(threshold, bandwidth,
-        hosts);
+  public List<DatanodeAdminError> startDiskBalancer(Optional<Double> threshold,
+      Optional<Long> bandwidthInMB, Optional<Integer> parallelThread,
+      Optional<List<String>> hosts) throws IOException {
+    return storageContainerLocationClient.startDiskBalancer(threshold,
+        bandwidthInMB, parallelThread, hosts);
   }
 
   @Override
-  public void stopDiskBalancer(Optional<List<String>> hosts)
+  public List<DatanodeAdminError> stopDiskBalancer(Optional<List<String>> hosts)
       throws IOException {
-    storageContainerLocationClient.stopDiskBalancer(hosts);
+    return storageContainerLocationClient.stopDiskBalancer(hosts);
   }
 
   @Override
   public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
-      Optional<List<String>> hosts) throws IOException {
+      Optional<List<String>> hosts,
+      Optional<HddsProtos.DiskBalancerRunningStatus> runningStatus)
+      throws IOException {
     return storageContainerLocationClient.getDiskBalancerStatus(hosts,
-        ClientVersion.CURRENT_VERSION);
+        runningStatus, ClientVersion.CURRENT_VERSION);
   }
 
   @Override
-  public void updateDiskBalancerConfiguration(Optional<Double> threshold,
-      Optional<Double> bandwidth, Optional<List<String>> hosts)
+  public List<DatanodeAdminError> updateDiskBalancerConfiguration(
+      Optional<Double> threshold, Optional<Long> bandwidth,
+      Optional<Integer> parallelThread, Optional<List<String>> hosts)
       throws IOException {
-    storageContainerLocationClient.updateDiskBalancerConfiguration(threshold,
-        bandwidth, hosts);
+    return storageContainerLocationClient.updateDiskBalancerConfiguration(
+        threshold, bandwidth, parallelThread, hosts);
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
index 267209cf03..32719f0d83 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.client.ScmClient;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -54,6 +55,7 @@ public class TestDiskBalancer {
   private static ScmClient storageClient;
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration ozoneConf;
+  private static DiskBalancerManager diskBalancerManager;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -63,6 +65,8 @@ public class TestDiskBalancer {
     cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
     storageClient = new ContainerOperationClient(ozoneConf);
     cluster.waitForClusterToBeReady();
+    diskBalancerManager = cluster.getStorageContainerManager()
+        .getDiskBalancerManager();
 
     for (DatanodeDetails dn: cluster.getStorageContainerManager()
         .getScmNodeManager().getAllNodes()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org