You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/08/26 07:41:21 UTC

[GitHub] [ozone] lokeshj1703 commented on a diff in pull request #3701: HDDS-7155. [DiskBalancer] Create interface between SCM and DN

lokeshj1703 commented on code in PR #3701:
URL: https://github.com/apache/ozone/pull/3701#discussion_r954755240


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java:
##########
@@ -1005,6 +1008,59 @@ public long getContainerCount() throws IOException {
     return response.getContainerCount();
   }
 
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+    DatanodeDiskBalancerInfoRequestProto request =
+        DatanodeDiskBalancerInfoRequestProto.newBuilder()
+            .setInfoType(DatanodeDiskBalancerInfoType.report)
+            .setCount(count)
+            .build();
+
+    DatanodeDiskBalancerInfoResponseProto response =
+        submitRequest(Type.DatanodeDiskBalancerInfo,
+            builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+            .getDatanodeDiskBalancerInfoResponse();
+
+    return response.getInfoList();
+  }
+
+  @Override
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(

Review Comment:
   This returns the same info as getDiskBalancerReport but is named differently?



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains configuration values for the DiskBalancer.
+ */
+@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
+public final class DiskBalancerConfiguration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+
+  @Config(key = "volume.density.threshold", type = ConfigType.STRING,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "Threshold is a percentage in the range of 0 to 100. A " +
+          "datanode is considered balanced if for each volume, the " +
+          "utilization of the volume(used space to capacity ratio) differs" +
+          " from the utilization of the datanode(used space to capacity ratio" +
+          " of the entire datanode) no more than the threshold.")
+  private String threshold = "10";
+
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "The max balance speed.")
+  private long diskBandwidth = 10;
+
+  @Config(key = "parallel.thread", type = ConfigType.INT,
+      defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
+      description = "The max parallel balance thread count.")
+  private int parallelThread = 5;
+
+  /**
+   * Gets the threshold value for DiskBalancer.
+   *
+   * @return percentage value in the range 0 to 100
+   */
+  public double getThreshold() {
+    return Double.parseDouble(threshold);
+  }
+
+  public double getThresholdAsRatio() {
+    return Double.parseDouble(threshold) / 100;
+  }
+
+  /**
+   * Sets the threshold value for Disk Balancer.
+   *
+   * @param threshold a percentage value in the range 0 to 100
+   */
+  public void setThreshold(double threshold) {
+    if (threshold < 0d || threshold >= 100d) {
+      throw new IllegalArgumentException(
+          "Threshold must be a percentage(double) in the range 0 to 100.");
+    }
+    this.threshold = String.valueOf(threshold);
+  }
+
+  /**
+   * Gets the disk bandwidth value for Disk Balancer.
+   *
+   * @return max disk bandwidth per second
+   */
+  public double getDiskBandwidth() {
+    return diskBandwidth;
+  }
+
+  /**
+   * Sets the disk bandwidth value for Disk Balancer.
+   *
+   * @param diskBandwidth the bandwidth to control balance speed
+   */
+  public void setDiskBandwidth(long diskBandwidth) {
+    if (diskBandwidth <= 0L) {
+      throw new IllegalArgumentException(
+          "diskBandwidth must be a value larger than 0.");
+    }
+    this.diskBandwidth = diskBandwidth;
+  }
+
+  /**
+   * Gets the parallel thread for Disk Balancer.
+   *
+   * @return parallel thread
+   */
+  public int getParallelThread() {
+    return parallelThread;
+  }
+
+  /**
+   * Sets the parallel thread for Disk Balancer.
+   *
+   * @param parallelThread the parallel thread count
+   */
+  public void setParallelThread(int parallelThread) {
+    if (parallelThread <= 0) {
+      throw new IllegalArgumentException(
+          "parallelThread must be a value larger than 0.");
+    }
+    this.parallelThread = parallelThread;
+  }
+  @Override

Review Comment:
   NIT: New line.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java:
##########
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private OzoneConfiguration conf;
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.conf = conf;
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.statusMap = new ConcurrentHashMap<>();
+  }
+
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+        new ArrayList<>();
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      double volumeDensitySum =
+          getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+      reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+          .setCurrentVolumeDensitySum(String.valueOf(volumeDensitySum))
+          .setNode(datanodeDetails.toProto(clientVersion))
+          .build());
+    }
+
+    reportList.sort((t1, t2) ->
+        (int)(Double.parseDouble(t2.getCurrentVolumeDensitySum()) -
+            Double.parseDouble(t1.getCurrentVolumeDensitySum())));
+    return reportList.stream().limit(count).collect(Collectors.toList());
+  }
+
+  /**
+   * If hosts is not null, return status of hosts;
+   * If hosts is null, return status of all datanodes in balancing.
+   */
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
+        new ArrayList<>();
+    List<DatanodeDetails> filterDns = null;
+    if (hosts.isPresent() && !hosts.get().isEmpty()) {
+      filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    }
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,

Review Comment:
   Loop over only the filter dns here. filterDns can be initialized with all nodes initially.



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains configuration values for the DiskBalancer.
+ */
+@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
+public final class DiskBalancerConfiguration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+
+  @Config(key = "volume.density.threshold", type = ConfigType.STRING,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "Threshold is a percentage in the range of 0 to 100. A " +
+          "datanode is considered balanced if for each volume, the " +
+          "utilization of the volume(used space to capacity ratio) differs" +
+          " from the utilization of the datanode(used space to capacity ratio" +
+          " of the entire datanode) no more than the threshold.")
+  private String threshold = "10";
+
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "The max balance speed.")
+  private long diskBandwidth = 10;
+
+  @Config(key = "parallel.thread", type = ConfigType.INT,
+      defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
+      description = "The max parallel balance thread count.")
+  private int parallelThread = 5;
+
+  /**
+   * Gets the threshold value for DiskBalancer.
+   *
+   * @return percentage value in the range 0 to 100
+   */
+  public double getThreshold() {
+    return Double.parseDouble(threshold);
+  }
+
+  public double getThresholdAsRatio() {
+    return Double.parseDouble(threshold) / 100;
+  }
+
+  /**
+   * Sets the threshold value for Disk Balancer.
+   *
+   * @param threshold a percentage value in the range 0 to 100
+   */
+  public void setThreshold(double threshold) {
+    if (threshold < 0d || threshold >= 100d) {
+      throw new IllegalArgumentException(
+          "Threshold must be a percentage(double) in the range 0 to 100.");
+    }
+    this.threshold = String.valueOf(threshold);
+  }
+
+  /**
+   * Gets the disk bandwidth value for Disk Balancer.
+   *
+   * @return max disk bandwidth per second
+   */
+  public double getDiskBandwidth() {
+    return diskBandwidth;
+  }
+
+  /**
+   * Sets the disk bandwidth value for Disk Balancer.
+   *
+   * @param diskBandwidth the bandwidth to control balance speed
+   */
+  public void setDiskBandwidth(long diskBandwidth) {

Review Comment:
   Is diskBandwidth in MB? Would it better to support string format as input?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java:
##########
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private OzoneConfiguration conf;
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.conf = conf;
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);

Review Comment:
   Let's not have this as a field in DiskBalancerManager. I feel there should be an admin option to specify the type of data. Or maybe we can auto determine the type.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java:
##########
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private OzoneConfiguration conf;
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.conf = conf;
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.statusMap = new ConcurrentHashMap<>();
+  }
+
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+        new ArrayList<>();
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      double volumeDensitySum =
+          getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+      reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+          .setCurrentVolumeDensitySum(String.valueOf(volumeDensitySum))
+          .setNode(datanodeDetails.toProto(clientVersion))
+          .build());
+    }
+
+    reportList.sort((t1, t2) ->
+        (int)(Double.parseDouble(t2.getCurrentVolumeDensitySum()) -
+            Double.parseDouble(t1.getCurrentVolumeDensitySum())));
+    return reportList.stream().limit(count).collect(Collectors.toList());
+  }
+
+  /**
+   * If hosts is not null, return status of hosts;
+   * If hosts is null, return status of all datanodes in balancing.
+   */
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
+        new ArrayList<>();
+    List<DatanodeDetails> filterDns = null;
+    if (hosts.isPresent() && !hosts.get().isEmpty()) {
+      filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    }
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      if (shouldReturnDatanode(filterDns, datanodeDetails)) {
+        double volumeDensitySum =
+            getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+        statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+            .setCurrentVolumeDensitySum(String.valueOf(volumeDensitySum))
+            .setDiskBalancerRunning(isRunning(datanodeDetails))
+            .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
+                    DiskBalancerStatus.DUMMY_STATUS)

Review Comment:
   Maybe we should call it unknown status instead and report it to the admin.



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java:
##########
@@ -391,4 +391,47 @@ StatusAndMessages finalizeScmUpgrade(String upgradeClientID)
   StatusAndMessages queryUpgradeFinalizationProgress(
       String upgradeClientID, boolean force, boolean readonly)
       throws IOException;
+
+  /**
+   * Get DiskBalancer report.
+   * REPORT shows the current volume density of datanodes.
+   * @param count top datanodes that need balancing
+   * @return List of DatanodeDiskBalancerInfo.
+   * @throws IOException
+   */
+  List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count) throws IOException;

Review Comment:
   I think it might be a good idea to support different policies for reporting the top datanodes. The user can optionally specify a different policy.
   For example: Sort by amount of unbalanced data in the datanode.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java:
##########
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private OzoneConfiguration conf;
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.conf = conf;
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.statusMap = new ConcurrentHashMap<>();
+  }
+
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+        new ArrayList<>();
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      double volumeDensitySum =
+          getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+      reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+          .setCurrentVolumeDensitySum(String.valueOf(volumeDensitySum))
+          .setNode(datanodeDetails.toProto(clientVersion))
+          .build());
+    }
+
+    reportList.sort((t1, t2) ->
+        (int)(Double.parseDouble(t2.getCurrentVolumeDensitySum()) -
+            Double.parseDouble(t1.getCurrentVolumeDensitySum())));
+    return reportList.stream().limit(count).collect(Collectors.toList());
+  }
+
+  /**
+   * If hosts is not null, return status of hosts;
+   * If hosts is null, return status of all datanodes in balancing.
+   */
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
+        new ArrayList<>();
+    List<DatanodeDetails> filterDns = null;
+    if (hosts.isPresent() && !hosts.get().isEmpty()) {
+      filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+          useHostnames);
+    }
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      if (shouldReturnDatanode(filterDns, datanodeDetails)) {
+        double volumeDensitySum =
+            getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+        statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+            .setCurrentVolumeDensitySum(String.valueOf(volumeDensitySum))
+            .setDiskBalancerRunning(isRunning(datanodeDetails))
+            .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
+                    DiskBalancerStatus.DUMMY_STATUS)

Review Comment:
   I think we should also show the unknown status if asked by admin. So that the admin knows heartbeat is not yet received for that datanode.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java:
##########
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerManager.class);
+
+  private OzoneConfiguration conf;
+  private final EventPublisher scmNodeEventPublisher;
+  private final SCMContext scmContext;
+  private final NodeManager nodeManager;
+  private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+  private Map<DatanodeDetails, Long> balancedBytesMap;
+  private boolean useHostnames;
+
+  /**
+   * Constructs DiskBalancer Manager.
+   */
+  public DiskBalancerManager(OzoneConfiguration conf,
+                        EventPublisher eventPublisher,
+                        SCMContext scmContext,
+                        NodeManager nodeManager) {
+    this.conf = conf;
+    this.scmNodeEventPublisher = eventPublisher;
+    this.scmContext = scmContext;
+    this.nodeManager = nodeManager;
+    this.useHostnames = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+        DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+    this.statusMap = new ConcurrentHashMap<>();
+  }
+
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+      int count, int clientVersion) throws IOException {
+
+    List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+        new ArrayList<>();
+
+    for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+        HddsProtos.NodeState.HEALTHY)) {
+      double volumeDensitySum =
+          getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+      reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+          .setCurrentVolumeDensitySum(String.valueOf(volumeDensitySum))
+          .setNode(datanodeDetails.toProto(clientVersion))
+          .build());
+    }
+
+    reportList.sort((t1, t2) ->
+        (int)(Double.parseDouble(t2.getCurrentVolumeDensitySum()) -
+            Double.parseDouble(t1.getCurrentVolumeDensitySum())));
+    return reportList.stream().limit(count).collect(Collectors.toList());
+  }
+
+  /**
+   * If hosts is not null, return status of hosts;
+   * If hosts is null, return status of all datanodes in balancing.
+   */
+  public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+      Optional<List<String>> hosts, int clientVersion) throws IOException {

Review Comment:
   I think we should also have an option to show balancer statuses based on the current status. (like running, unknown, not_running) etc. We can also create a separate jira for it though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org