You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/06/24 01:36:02 UTC
[06/49] hadoop git commit: HDFS-9420. Add DataModels for
DiskBalancer. Contributed by Anu Engineer
HDFS-9420. Add DataModels for DiskBalancer. Contributed by Anu Engineer
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91a5c481
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91a5c481
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91a5c481
Branch: refs/heads/HDFS-1312
Commit: 91a5c4814381a4d4c3ce9b29a1f85299e03be835
Parents: 0b9edf6
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Nov 23 19:07:42 2015 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 23 18:18:48 2016 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/HDFS-1312_CHANGES.txt | 5 +
.../connectors/ClusterConnector.java | 44 +++
.../diskbalancer/connectors/package-info.java | 29 ++
.../datamodel/DiskBalancerCluster.java | 249 ++++++++++++++
.../datamodel/DiskBalancerDataNode.java | 269 +++++++++++++++
.../datamodel/DiskBalancerVolume.java | 330 +++++++++++++++++++
.../datamodel/DiskBalancerVolumeSet.java | 325 ++++++++++++++++++
.../diskbalancer/datamodel/package-info.java | 31 ++
.../hdfs/server/diskbalancer/package-info.java | 36 ++
.../diskbalancer/DiskBalancerTestUtil.java | 227 +++++++++++++
.../server/diskbalancer/TestDataModels.java | 224 +++++++++++++
.../diskbalancer/connectors/NullConnector.java | 59 ++++
12 files changed, 1828 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
new file mode 100644
index 0000000..5a71032
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/HDFS-1312_CHANGES.txt
@@ -0,0 +1,5 @@
+HDFS-1312 Change Log
+
+ NEW FEATURES
+
+ HDFS-9420. Add DataModels for DiskBalancer. (Anu Engineer via szetszwo)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java
new file mode 100644
index 0000000..3dbfec2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/ClusterConnector.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
+
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+
+import java.util.List;
+
+/**
+ * ClusterConnector interface hides all specifics about how we communicate to
+ * the HDFS cluster. This interface returns data in classes that diskbalancer
+ * understands.
+ */
+public interface ClusterConnector {
+
+ /**
+ * getNodes function returns a list of DiskBalancerDataNodes.
+ *
+ * @return Array of DiskBalancerDataNodes
+ */
+ List<DiskBalancerDataNode> getNodes() throws Exception;
+
+ /**
+ * Returns info about the connector.
+ *
+ * @return String.
+ */
+ String getConnectorInfo();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java
new file mode 100644
index 0000000..8164804
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/package-info.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
+
+/**
+ * Connectors package is a set of logical connectors that connect
+ * to various data sources to read the hadoop cluster information.
+ *
+ * We currently have 1 connector in this package. it is
+ *
+ * NullConnector - This is an in-memory connector that is useful in testing.
+ * we can crate dataNodes on the fly and attach to this connector and
+ * ask the diskBalancer Cluster to read data from this source.
+ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
new file mode 100644
index 0000000..91f7eaa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerCluster.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.codehaus.jackson.annotate.JsonIgnore;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * DiskBalancerCluster represents the nodes that we are working against.
+ * <p/>
+ * Please Note :
+ * <p/>
+ * Semantics of inclusionList and exclusionLists.
+ * <p/>
+ * If a non-empty inclusionList is specified then the diskBalancer assumes that
+ * the user is only interested in processing that list of nodes. This node list
+ * is checked against the exclusionList and only the nodes in inclusionList but
+ * not in exclusionList is processed.
+ * <p/>
+ * if inclusionList is empty, then we assume that all live nodes in the nodes is
+ * to be processed by diskBalancer. In that case diskBalancer will avoid any
+ * nodes specified in the exclusionList but will process all nodes in the
+ * cluster.
+ * <p/>
+ * In other words, an empty inclusionList is means all the nodes otherwise
+ * only a given list is processed and ExclusionList is always honored.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DiskBalancerCluster {
+
+ static final Log LOG = LogFactory.getLog(DiskBalancerCluster.class);
+ private final Set<String> exclusionList;
+ private final Set<String> inclusionList;
+ private ClusterConnector clusterConnector;
+ private List<DiskBalancerDataNode> nodes;
+ private String outputpath;
+
+ @JsonIgnore
+ private List<DiskBalancerDataNode> nodesToProcess;
+ private float threshold;
+
+ /**
+ * Empty Constructor needed by Jackson.
+ */
+ public DiskBalancerCluster() {
+ nodes = new LinkedList<>();
+ exclusionList = new TreeSet<>();
+ inclusionList = new TreeSet<>();
+
+ }
+
+ /**
+ * Constructs a DiskBalancerCluster.
+ *
+ * @param connector - ClusterConnector
+ * @throws IOException
+ */
+ public DiskBalancerCluster(ClusterConnector connector) throws IOException {
+ Preconditions.checkNotNull(connector);
+ clusterConnector = connector;
+ exclusionList = new TreeSet<>();
+ inclusionList = new TreeSet<>();
+ }
+
+ /**
+ * Parses a Json string and converts to DiskBalancerCluster.
+ *
+ * @param json - Json String
+ * @return DiskBalancerCluster
+ * @throws IOException
+ */
+ public static DiskBalancerCluster parseJson(String json) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(json, DiskBalancerCluster.class);
+ }
+
+ /**
+ * readClusterInfo connects to the cluster and reads the node's data. This
+ * data is used as basis of rest of computation in DiskBalancerCluster
+ */
+ public void readClusterInfo() throws Exception {
+ Preconditions.checkNotNull(clusterConnector);
+ LOG.info("Using connector : " + clusterConnector.getConnectorInfo());
+ nodes = clusterConnector.getNodes();
+ }
+
+ /**
+ * Gets all DataNodes in the Cluster.
+ *
+ * @return Array of DisKBalancerDataNodes
+ */
+ public List<DiskBalancerDataNode> getNodes() {
+ return nodes;
+ }
+
+ /**
+ * Sets the list of nodes of this cluster.
+ *
+ * @param clusterNodes List of Nodes
+ */
+ public void setNodes(List<DiskBalancerDataNode> clusterNodes) {
+ this.nodes = clusterNodes;
+ }
+
+ /**
+ * Returns the current ExclusionList.
+ *
+ * @return List of Nodes that are excluded from diskBalancer right now.
+ */
+ public Set<String> getExclusionList() {
+ return exclusionList;
+ }
+
+ /**
+ * sets the list of nodes to exclude from process of diskBalancer.
+ *
+ * @param excludedNodes - exclusionList of nodes.
+ */
+ public void setExclusionList(Set<String> excludedNodes) {
+ this.exclusionList.addAll(excludedNodes);
+ }
+
+ /**
+ * Returns the threshold value. This is used for indicating how much skew is
+ * acceptable, This is expressed as a percentage. For example to say 20% skew
+ * between volumes is acceptable set this value to 20.
+ *
+ * @return float
+ */
+ public float getThreshold() {
+ return threshold;
+ }
+
+ /**
+ * Sets the threshold value.
+ *
+ * @param thresholdPercent - float - in percentage
+ */
+ public void setThreshold(float thresholdPercent) {
+ Preconditions.checkState((thresholdPercent >= 0.0f) &&
+ (thresholdPercent <= 100.0f), "A percentage value expected.");
+ this.threshold = thresholdPercent;
+ }
+
+ /**
+ * Gets the Inclusion list.
+ *
+ * @return List of machine to be processed by diskBalancer.
+ */
+ public Set<String> getInclusionList() {
+ return inclusionList;
+ }
+
+ /**
+ * Sets the inclusionList.
+ *
+ * @param includeNodes - set of machines to be processed by diskBalancer.
+ */
+ public void setInclusionList(Set<String> includeNodes) {
+ this.inclusionList.addAll(includeNodes);
+ }
+
+ /**
+ * returns a serialized json string.
+ *
+ * @return String - json
+ * @throws IOException
+ */
+ public String toJson() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(this);
+ }
+
+ /**
+ * Returns the Nodes to Process which is the real list of nodes processed by
+ * diskBalancer.
+ *
+ * @return List of DiskBalancerDataNodes
+ */
+ @JsonIgnore
+ public List<DiskBalancerDataNode> getNodesToProcess() {
+ return nodesToProcess;
+ }
+
+ /**
+ * Sets the nodes to process.
+ *
+ * @param dnNodesToProcess - List of DataNodes to process
+ */
+ @JsonIgnore
+ public void setNodesToProcess(List<DiskBalancerDataNode> dnNodesToProcess) {
+ this.nodesToProcess = dnNodesToProcess;
+ }
+
+ /**
+ * Returns th output path for this cluster.
+ */
+ public String getOutput() {
+ return outputpath;
+ }
+
+ /**
+ * Sets the output path for this run.
+ *
+ * @param output - Path
+ */
+ public void setOutput(String output) {
+ this.outputpath = output;
+ }
+
+ /**
+ * Writes a snapshot of the cluster to the specified directory.
+ *
+ * @param snapShotName - name of the snapshot
+ */
+ public void createSnapshot(String snapShotName) throws IOException {
+ String json = this.toJson();
+ File outFile = new File(getOutput() + "/" + snapShotName);
+ FileUtils.writeStringToFile(outFile, json);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
new file mode 100644
index 0000000..87030db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerDataNode.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * DiskBalancerDataNode represents a DataNode that exists in the cluster. It
+ * also contains a metric called nodeDataDensity which allows us to compare
+ * between a set of Nodes.
+ */
+public class DiskBalancerDataNode implements Comparable<DiskBalancerDataNode> {
+ private float nodeDataDensity;
+ private Map<String, DiskBalancerVolumeSet> volumeSets;
+ private String dataNodeUUID;
+ private String dataNodeIP;
+ private int dataNodePort;
+ private String dataNodeName;
+ private int volumeCount;
+
+ /**
+ * Constructs an Empty Data Node.
+ */
+ public DiskBalancerDataNode() {
+ }
+
+ /**
+ * Constructs a DataNode.
+ *
+ * @param dataNodeID - Node ID
+ */
+ public DiskBalancerDataNode(String dataNodeID) {
+ this.dataNodeUUID = dataNodeID;
+ volumeSets = new HashMap<>();
+ }
+
+ /**
+ * Returns the IP address of this Node.
+ *
+ * @return IP Address string
+ */
+ public String getDataNodeIP() {
+ return dataNodeIP;
+ }
+
+ /**
+ * Sets the IP address of this Node.
+ *
+ * @param ipaddress - IP Address
+ */
+ public void setDataNodeIP(String ipaddress) {
+ this.dataNodeIP = ipaddress;
+ }
+
+ /**
+ * Returns the Port of this DataNode.
+ *
+ * @return Port Number
+ */
+ public int getDataNodePort() {
+ return dataNodePort;
+ }
+
+ /**
+ * Sets the DataNode Port number.
+ *
+ * @param port - Datanode Port Number
+ */
+ public void setDataNodePort(int port) {
+ this.dataNodePort = port;
+ }
+
+ /**
+ * Get DataNode DNS name.
+ *
+ * @return name of the node
+ */
+ public String getDataNodeName() {
+ return dataNodeName;
+ }
+
+ /**
+ * Sets node's DNS name.
+ *
+ * @param name - Data node name
+ */
+ public void setDataNodeName(String name) {
+ this.dataNodeName = name;
+ }
+
+ /**
+ * Returns the Volume sets on this node.
+ *
+ * @return a Map of VolumeSets
+ */
+ public Map<String, DiskBalancerVolumeSet> getVolumeSets() {
+ return volumeSets;
+ }
+
+ /**
+ * Returns datanode ID.
+ **/
+ public String getDataNodeUUID() {
+ return dataNodeUUID;
+ }
+
+ /**
+ * Sets Datanode UUID.
+ *
+ * @param nodeID - Node ID.
+ */
+ public void setDataNodeUUID(String nodeID) {
+ this.dataNodeUUID = nodeID;
+ }
+
+ /**
+ * Indicates whether some other object is "equal to" this one.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if ((obj == null) || (obj.getClass() != getClass())) {
+ return false;
+ }
+ DiskBalancerDataNode that = (DiskBalancerDataNode) obj;
+ return dataNodeUUID.equals(that.getDataNodeUUID());
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less than,
+ * equal to, or greater than the specified object.
+ *
+ * @param that the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is
+ * less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(DiskBalancerDataNode that) {
+ Preconditions.checkNotNull(that);
+
+ if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+ < 0) {
+ return -1;
+ }
+
+ if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+ == 0) {
+ return 0;
+ }
+
+ if (Float.compare(this.nodeDataDensity - that.getNodeDataDensity(), 0)
+ > 0) {
+ return 1;
+ }
+ return 0;
+ }
+
+ /**
+ * Returns a hash code value for the object. This method is supported for the
+ * benefit of hash tables such as those provided by {@link HashMap}.
+ */
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ /**
+ * returns NodeDataDensity Metric.
+ *
+ * @return float
+ */
+ public float getNodeDataDensity() {
+ return nodeDataDensity;
+ }
+
+ /**
+ * computes nodes data density.
+ * <p/>
+ * This metric allows us to compare different nodes and how well the data is
+ * spread across a set of volumes inside the node.
+ */
+ public void computeNodeDensity() {
+ float sum = 0;
+ int volcount = 0;
+ for (DiskBalancerVolumeSet vset : volumeSets.values()) {
+ for (DiskBalancerVolume vol : vset.getVolumes()) {
+ sum += Math.abs(vol.getVolumeDataDensity());
+ volcount++;
+ }
+ }
+ nodeDataDensity = sum;
+ this.volumeCount = volcount;
+
+ }
+
+ /**
+ * Computes if this node needs balancing at all.
+ *
+ * @param threshold - Percentage
+ * @return true or false
+ */
+ public boolean isBalancingNeeded(float threshold) {
+ for (DiskBalancerVolumeSet vSet : getVolumeSets().values()) {
+ if (vSet.isBalancingNeeded(threshold)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Adds a volume to the DataNode.
+ * <p/>
+ * it is assumed that we have one thread per node hence this call is not
+ * synchronised neither is the map is protected.
+ *
+ * @param volume - volume
+ */
+ public void addVolume(DiskBalancerVolume volume) throws Exception {
+ Preconditions.checkNotNull(volume, "volume cannot be null");
+ Preconditions.checkNotNull(volumeSets, "volume sets cannot be null");
+ Preconditions
+ .checkNotNull(volume.getStorageType(), "storage type cannot be null");
+
+ String volumeSetKey = volume.getStorageType();
+ DiskBalancerVolumeSet vSet;
+ if (volumeSets.containsKey(volumeSetKey)) {
+ vSet = volumeSets.get(volumeSetKey);
+ } else {
+ vSet = new DiskBalancerVolumeSet(volume.isTransient());
+ volumeSets.put(volumeSetKey, vSet);
+ }
+
+ vSet.addVolume(volume);
+ computeNodeDensity();
+ }
+
+ /**
+ * Returns how many volumes are in the DataNode.
+ *
+ * @return int
+ */
+ public int getVolumeCount() {
+ return volumeCount;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
new file mode 100644
index 0000000..a608248
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolume.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * DiskBalancerVolume represents a volume in the DataNode.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DiskBalancerVolume {
+ private String path;
+ private long capacity;
+ private String storageType;
+ private long used;
+ private long reserved;
+ private String uuid;
+ private boolean failed;
+ private boolean isTransient;
+ private float volumeDataDensity;
+ private boolean skip = false;
+ private boolean isReadOnly;
+
+ /**
+ * Constructs DiskBalancerVolume.
+ */
+ public DiskBalancerVolume() {
+ }
+
+ /**
+ * Parses a Json string and converts to DiskBalancerVolume.
+ *
+ * @param json - Json String
+ *
+ * @return DiskBalancerCluster
+ *
+ * @throws IOException
+ */
+ public static DiskBalancerVolume parseJson(String json) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(json, DiskBalancerVolume.class);
+ }
+
+ /**
+ * Get this volume Data Density
+ * Please see DiskBalancerVolumeSet#computeVolumeDataDensity to see how
+ * this is computed.
+ *
+ * @return float.
+ */
+ public float getVolumeDataDensity() {
+ return volumeDataDensity;
+ }
+
+ /**
+ * Sets this volume's data density.
+ *
+ * @param volDataDensity - density
+ */
+ public void setVolumeDataDensity(float volDataDensity) {
+ this.volumeDataDensity = volDataDensity;
+ }
+
+ /**
+ * Indicates if the volume is Transient in nature.
+ *
+ * @return true or false.
+ */
+ public boolean isTransient() {
+ return isTransient;
+ }
+
+ /**
+ * Sets volumes transient nature.
+ *
+ * @param aTransient - bool
+ */
+ public void setTransient(boolean aTransient) {
+ this.isTransient = aTransient;
+ }
+
+ /**
+ * Compares two volumes and decides if it is the same volume.
+ *
+ * @param o Volume Object
+ *
+ * @return boolean
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DiskBalancerVolume that = (DiskBalancerVolume) o;
+ return uuid.equals(that.uuid);
+ }
+
+ /**
+ * Computes hash code for a diskBalancerVolume.
+ *
+ * @return int
+ */
+ @Override
+ public int hashCode() {
+ return uuid.hashCode();
+ }
+
+ /**
+ * Capacity of this volume.
+ *
+ * @return long
+ */
+ public long getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Sets the capacity of this volume.
+ *
+ * @param totalCapacity long
+ */
+ public void setCapacity(long totalCapacity) {
+ this.capacity = totalCapacity;
+ }
+
+ /**
+ * Indicates if this is a failed volume.
+ *
+ * @return boolean
+ */
+ public boolean isFailed() {
+ return failed;
+ }
+
+ /**
+ * Sets the failed flag for this volume.
+ *
+ * @param fail boolean
+ */
+ public void setFailed(boolean fail) {
+ this.failed = fail;
+ }
+
+ /**
+ * Returns the path for this volume.
+ *
+ * @return String
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * Sets the path for this volume.
+ *
+ * @param volPath Path
+ */
+ public void setPath(String volPath) {
+ this.path = volPath;
+ }
+
+ /**
+ * Gets the reserved size for this volume.
+ *
+ * @return Long - Reserved size.
+ */
+ public long getReserved() {
+ return reserved;
+ }
+
+ /**
+ * Sets the reserved size.
+ *
+ * @param reservedSize -- Sets the reserved.
+ */
+ public void setReserved(long reservedSize) {
+ this.reserved = reservedSize;
+ }
+
+ /**
+ * Gets the StorageType.
+ *
+ * @return String StorageType.
+ */
+ public String getStorageType() {
+ return storageType;
+ }
+
+ /**
+ * Sets the StorageType.
+ *
+ * @param typeOfStorage - Storage Type String.
+ */
+ public void setStorageType(String typeOfStorage) {
+ this.storageType = typeOfStorage;
+ }
+
+ /**
+ * Gets the dfsUsed Size.
+ *
+ * @return - long - used space
+ */
+ public long getUsed() {
+ return used;
+ }
+
+ /**
+ * Sets the used Space for Long.
+ *
+ * @param dfsUsedSpace - dfsUsedSpace for this volume.
+ */
+ public void setUsed(long dfsUsedSpace) {
+ Preconditions.checkArgument(dfsUsedSpace < this.getCapacity());
+ this.used = dfsUsedSpace;
+ }
+
+ /**
+ * Gets the uuid for this volume.
+ *
+ * @return String - uuid of th volume
+ */
+ public String getUuid() {
+ return uuid;
+ }
+
+ /**
+ * Sets the uuid for this volume.
+ *
+ * @param id - String
+ */
+ public void setUuid(String id) {
+ this.uuid = id;
+ }
+
+ /**
+ * Returns effective capacity of a volume.
+ *
+ * @return float - fraction that represents used capacity.
+ */
+ @JsonIgnore
+ public long computeEffectiveCapacity() {
+ return getCapacity() - getReserved();
+ }
+
+ /**
+ * returns a Json String.
+ *
+ * @return String
+ *
+ * @throws IOException
+ */
+ public String toJson() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(this);
+ }
+
+ /**
+ * returns if we should skip this volume.
+ * @return true / false
+ */
+ public boolean isSkip() {
+ return skip;
+ }
+
+ /**
+ * Sets the Skip value for this volume.
+ * @param skipValue bool
+ */
+ public void setSkip(boolean skipValue) {
+ this.skip = skipValue;
+ }
+
+ /**
+ * Returns the usedPercentage of a disk.
+ * This is useful in debugging disk usage
+ * @return float
+ */
+ public float computeUsedPercentage() {
+ return (float) (getUsed()) / (float) (getCapacity());
+ }
+
+ /**
+ * Tells us if a volume is transient.
+ * @param transientValue
+ */
+ public void setIsTransient(boolean transientValue) {
+ this.isTransient = transientValue;
+ }
+
+ /**
+ * Tells us if this volume is read-only.
+ * @return true / false
+ */
+ public boolean isReadOnly() {
+ return isReadOnly;
+ }
+
+ /**
+ * Sets this volume as read only.
+ * @param readOnly - boolean
+ */
+ public void setReadOnly(boolean readOnly) {
+ isReadOnly = readOnly;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
new file mode 100644
index 0000000..15c21ac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/DiskBalancerVolumeSet.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.htrace.fasterxml.jackson.annotation.JsonProperty;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+/**
+ * DiskBalancerVolumeSet is a collection of storage devices on the
+ * data node which are of similar StorageType.
+ */
+@JsonIgnoreProperties({"sortedQueue", "volumeCount", "idealUsed"})
+public class DiskBalancerVolumeSet {
+ static final Log LOG = LogFactory.getLog(DiskBalancerVolumeSet.class);
+ private final int maxDisks = 256;
+
+ @JsonProperty("transient")
+ private boolean isTransient;
+ private Set<DiskBalancerVolume> volumes;
+
+ @JsonIgnore
+ private TreeSet<DiskBalancerVolume> sortedQueue;
+ private String storageType;
+ private String setID;
+
+ private float idealUsed;
+
+
+ /**
+ * Constructs Empty DiskNBalanceVolumeSet.
+ * This is needed by jackson
+ */
+ public DiskBalancerVolumeSet() {
+ setID = UUID.randomUUID().toString();
+ }
+
+ /**
+ * Constructs a DiskBalancerVolumeSet.
+ *
+ * @param isTransient - boolean
+ */
+ public DiskBalancerVolumeSet(boolean isTransient) {
+ this.isTransient = isTransient;
+ volumes = new HashSet<>(maxDisks);
+ sortedQueue = new TreeSet<>(new MinHeap());
+ this.storageType = null;
+ setID = UUID.randomUUID().toString();
+ }
+
+ /**
+ * Constructs a new DiskBalancerVolumeSet.
+ */
+ public DiskBalancerVolumeSet(DiskBalancerVolumeSet volumeSet) {
+ this.isTransient = volumeSet.isTransient();
+ this.storageType = volumeSet.storageType;
+ this.volumes = new HashSet<>(volumeSet.volumes);
+ sortedQueue = new TreeSet<>(new MinHeap());
+ setID = UUID.randomUUID().toString();
+ }
+
+ /**
+ * Tells us if this volumeSet is transient.
+ *
+ * @return - true or false
+ */
+ @JsonProperty("transient")
+ public boolean isTransient() {
+ return isTransient;
+ }
+
+ /**
+ * Set the transient properties for this volumeSet.
+ *
+ * @param transientValue - Boolean
+ */
+ @JsonProperty("transient")
+ public void setTransient(boolean transientValue) {
+ this.isTransient = transientValue;
+ }
+
+ /**
+ * Computes Volume Data Density. Adding a new volume changes
+ * the volumeDataDensity for all volumes. So we throw away
+ * our priority queue and recompute everything.
+ *
+ * we discard failed volumes from this computation.
+ *
+ * totalCapacity = totalCapacity of this volumeSet
+ * totalUsed = totalDfsUsed for this volumeSet
+ * idealUsed = totalUsed / totalCapacity
+ * dfsUsedRatio = dfsUsedOnAVolume / Capacity On that Volume
+ * volumeDataDensity = idealUsed - dfsUsedRatio
+ */
+ public void computeVolumeDataDensity() {
+ long totalCapacity = 0;
+ long totalUsed = 0;
+ sortedQueue.clear();
+
+ // when we plan to re-distribute data we need to make
+ // sure that we skip failed volumes.
+ for (DiskBalancerVolume volume : volumes) {
+ if (!volume.isFailed() && !volume.isSkip()) {
+
+ if (volume.computeEffectiveCapacity() < 0) {
+ skipMisConfiguredVolume(volume);
+ continue;
+ }
+
+ totalCapacity += volume.computeEffectiveCapacity();
+ totalUsed += volume.getUsed();
+ }
+ }
+
+ if (totalCapacity != 0) {
+ this.idealUsed = totalUsed / (float) totalCapacity;
+ }
+
+ for (DiskBalancerVolume volume : volumes) {
+ if (!volume.isFailed() && !volume.isSkip()) {
+ float dfsUsedRatio =
+ volume.getUsed() / (float) volume.computeEffectiveCapacity();
+ volume.setVolumeDataDensity(this.idealUsed - dfsUsedRatio);
+ sortedQueue.add(volume);
+ }
+ }
+ }
+
+ private void skipMisConfiguredVolume(DiskBalancerVolume volume) {
+ //probably points to some sort of mis-configuration. Log this and skip
+ // processing this volume.
+ String errMessage = String.format("Real capacity is negative." +
+ "This usually points to some " +
+ "kind of mis-configuration.%n" +
+ "Capacity : %d Reserved : %d " +
+ "realCap = capacity - " +
+ "reserved = %d.%n" +
+ "Skipping this volume from " +
+ "all processing. type : %s id" +
+ " :%s",
+ volume.getCapacity(),
+ volume.getReserved(),
+ volume.computeEffectiveCapacity(),
+ volume.getStorageType(),
+ volume.getUuid());
+
+ LOG.fatal(errMessage);
+ volume.setSkip(true);
+ }
+
+ /**
+ * Returns the number of volumes in the Volume Set.
+ *
+ * @return int
+ */
+ @JsonIgnore
+ public int getVolumeCount() {
+ return volumes.size();
+ }
+
+ /**
+ * Get Storage Type.
+ *
+ * @return String
+ */
+ public String getStorageType() {
+ return storageType;
+ }
+
+ /**
+ * Set Storage Type.
+ * @param typeOfStorage -- StorageType
+ */
+ public void setStorageType(String typeOfStorage) {
+ this.storageType = typeOfStorage;
+ }
+
+ /**
+ * adds a given volume into this volume set.
+ *
+ * @param volume - volume to add.
+ *
+ * @throws Exception
+ */
+ public void addVolume(DiskBalancerVolume volume) throws Exception {
+ Preconditions.checkNotNull(volume, "volume cannot be null");
+ Preconditions.checkState(isTransient() == volume.isTransient(),
+ "Mismatch in volumeSet and volume's transient " +
+ "properties.");
+
+
+ if (this.storageType == null) {
+ Preconditions.checkState(volumes.size() == 0L, "Storage Type is Null but"
+ + " volume size is " + volumes.size());
+ this.storageType = volume.getStorageType();
+ } else {
+ Preconditions.checkState(this.storageType.equals(volume.getStorageType()),
+ "Adding wrong type of disk to this volume set");
+ }
+ volumes.add(volume);
+ computeVolumeDataDensity();
+
+ }
+
+ /**
+ * Returns a list diskVolumes that are part of this volume set.
+ *
+ * @return List
+ */
+ public List<DiskBalancerVolume> getVolumes() {
+ return new ArrayList<>(volumes);
+ }
+
+
+ @JsonIgnore
+ public TreeSet<DiskBalancerVolume> getSortedQueue() {
+ return sortedQueue;
+ }
+
+ /**
+ * Computes whether we need to do any balancing on this volume Set at all.
+ * It checks if any disks are out of threshold value
+ *
+ * @param thresholdPercentage - threshold - in percentage
+ *
+ * @return true if balancing is needed false otherwise.
+ */
+ public boolean isBalancingNeeded(float thresholdPercentage) {
+ float threshold = thresholdPercentage / 100.0f;
+
+ if(volumes == null || volumes.size() <= 1) {
+ // there is nothing we can do with a single volume.
+ // so no planning needed.
+ return false;
+ }
+
+ for (DiskBalancerVolume vol : volumes) {
+ boolean notSkip = !vol.isFailed() && !vol.isTransient() && !vol.isSkip();
+ if ((Math.abs(vol.getVolumeDataDensity()) > threshold) && notSkip) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Remove a volume from the current set.
+ *
+ * This call does not recompute the volumeDataDensity. It has to be
+ * done manually after this call.
+ *
+ * @param volume - Volume to remove
+ */
+ public void removeVolume(DiskBalancerVolume volume) {
+ volumes.remove(volume);
+ sortedQueue.remove(volume);
+ }
+
+ /**
+ * Get Volume Set ID.
+ * @return String
+ */
+ public String getSetID() {
+ return setID;
+ }
+
+ /**
+ * Set VolumeSet ID.
+ * @param volID String
+ */
+ public void setSetID(String volID) {
+ this.setID = volID;
+ }
+
+ /**
+ * Gets the idealUsed for this volume set.
+ */
+
+ @JsonIgnore
+ public float getIdealUsed() {
+ return this.idealUsed;
+ }
+
+ static class MinHeap implements Comparator<DiskBalancerVolume>, Serializable {
+
+ /**
+ * Compares its two arguments for order. Returns a negative integer,
+ * zero, or a positive integer as the first argument is less than, equal
+ * to, or greater than the second.
+ */
+ @Override
+ public int compare(DiskBalancerVolume first, DiskBalancerVolume second) {
+ return Float
+ .compare(second.getVolumeDataDensity(), first.getVolumeDataDensity());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java
new file mode 100644
index 0000000..f72e283
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/datamodel/package-info.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer.datamodel;
+/**
+ * Disk Balancer Data Model is the Data Model for the cluster that
+ * Disk Balancer is working against. This information is read
+ * directly from NameNode or from a user supplied json model file.
+ *
+ * Here is the overview of the model maintained by diskBalancer.
+ *
+ * DiskBalancerCluster is a list of DiskBalancerDataNodes.
+ * DiskBalancerDataNodes is a collection of DiskBalancerVolumeSets
+ * DiskBalancerVolumeSets is a collection of DiskBalancerVolumes
+ * DiskBalancerVolumes represents actual volumes on DataNodes.
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java
new file mode 100644
index 0000000..4bec98f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/package-info.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+/**
+ * Disk Balancer connects to a {@link org.apache.hadoop.hdfs.server.datanode
+ * .DataNode} and attempts to spread data across all volumes evenly.
+ *
+ * This is achieved by :
+ *
+ * 1) Calculating the average data that should be on a set of volumes grouped
+ * by the type. For example, how much data should be on each volume of SSDs on a
+ * machine.
+ *
+ * 2) Once we know the average data that is expected to be on a volume we
+ * move data from volumes with higher than average load to volumes with
+ * less than average load.
+ *
+ * 3) Disk Balancer operates against data nodes which are live and operational.
+ *
+ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
new file mode 100644
index 0000000..5e3f4bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
+import org.apache.hadoop.util.Time;
+
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Helper class to create various cluster configrations at run time.
+ */
+public class DiskBalancerTestUtil {
+ // we modeling disks here, hence HDD style units
+ public static final long GB = 1000000000L;
+ public static final long TB = 1000000000000L;
+ private static int[] diskSizes =
+ {1, 2, 3, 4, 5, 6, 7, 8, 9, 100, 200, 300, 400, 500, 600, 700, 800, 900};
+ Random rand;
+ private String stringTable =
+ "ABCDEDFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0987654321";
+
+ /**
+ * Constructs a util class.
+ */
+ public DiskBalancerTestUtil() {
+ this.rand = new Random(Time.monotonicNow());
+ }
+
+ /**
+ * Returns a random string.
+ *
+ * @param length - Number of chars in the string
+ *
+ * @return random String
+ */
+ private String getRandomName(int length) {
+ StringBuilder name = new StringBuilder();
+ for (int x = 0; x < length; x++) {
+ name.append(stringTable.charAt(rand.nextInt(stringTable.length())));
+ }
+ return name.toString();
+ }
+
+ /**
+ * Returns a Random Storage Type.
+ *
+ * @return - StorageType
+ */
+ private StorageType getRandomStorageType() {
+ return StorageType.parseStorageType(rand.nextInt(3));
+ }
+
+ /**
+ * Returns random capacity, if the size is smaller than 10
+ * they are TBs otherwise the size is assigned to GB range.
+ *
+ * @return Long - Disk Size
+ */
+ private long getRandomCapacity() {
+ int size = diskSizes[rand.nextInt(diskSizes.length)];
+ if (size < 10) {
+ return size * TB;
+ } else {
+ return size * GB;
+ }
+ }
+
+ /**
+ * Some value under 20% in these tests.
+ */
+ private long getRandomReserved(long capacity) {
+ double rcap = capacity * 0.2d;
+ double randDouble = rand.nextDouble();
+ double temp = randDouble * rcap;
+ return (new Double(temp)).longValue();
+
+ }
+
+ /**
+ * Some value less that capacity - reserved.
+ */
+ private long getRandomDfsUsed(long capacity, long reserved) {
+ double rcap = capacity - reserved;
+ double randDouble = rand.nextDouble();
+ double temp = randDouble * rcap;
+ return (new Double(temp)).longValue();
+ }
+
+ /**
+ * Creates a Random Volume of a specific storageType.
+ *
+ * @return Volume
+ */
+ public DiskBalancerVolume createRandomVolume() {
+ return createRandomVolume(getRandomStorageType());
+ }
+
+ /**
+ * Creates a Random Volume for testing purpose.
+ *
+ * @param type - StorageType
+ *
+ * @return DiskBalancerVolume
+ */
+ public DiskBalancerVolume createRandomVolume(StorageType type) {
+ DiskBalancerVolume volume = new DiskBalancerVolume();
+ volume.setPath("/tmp/disk/" + getRandomName(10));
+ volume.setStorageType(type.toString());
+ volume.setTransient(type.isTransient());
+
+ volume.setCapacity(getRandomCapacity());
+ volume.setReserved(getRandomReserved(volume.getCapacity()));
+ volume
+ .setUsed(getRandomDfsUsed(volume.getCapacity(), volume.getReserved()));
+ volume.setUuid(UUID.randomUUID().toString());
+ return volume;
+ }
+
+ /**
+ * Creates a RandomVolumeSet.
+ *
+ * @param type -Storage Type
+ * @param diskCount - How many disks you need.
+ *
+ * @return volumeSet
+ *
+ * @throws Exception
+ */
+ public DiskBalancerVolumeSet createRandomVolumeSet(StorageType type,
+ int diskCount)
+ throws Exception {
+
+ Preconditions.checkState(diskCount > 0);
+ DiskBalancerVolumeSet volumeSet =
+ new DiskBalancerVolumeSet(type.isTransient());
+ for (int x = 0; x < diskCount; x++) {
+ volumeSet.addVolume(createRandomVolume(type));
+ }
+ assert (volumeSet.getVolumeCount() == diskCount);
+ return volumeSet;
+ }
+
+ /**
+ * Creates a RandomDataNode.
+ *
+ * @param diskTypes - Storage types needed in the Node
+ * @param diskCount - Disk count - that many disks of each type is created
+ *
+ * @return DataNode
+ *
+ * @throws Exception
+ */
+ public DiskBalancerDataNode createRandomDataNode(StorageType[] diskTypes,
+ int diskCount)
+ throws Exception {
+ Preconditions.checkState(diskTypes.length > 0);
+ Preconditions.checkState(diskCount > 0);
+
+ DiskBalancerDataNode node =
+ new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+ for (StorageType t : diskTypes) {
+ DiskBalancerVolumeSet vSet = createRandomVolumeSet(t, diskCount);
+ for (DiskBalancerVolume v : vSet.getVolumes()) {
+ node.addVolume(v);
+ }
+ }
+ return node;
+ }
+
+ /**
+ * Creates a RandomCluster.
+ *
+ * @param dataNodeCount - How many nodes you need
+ * @param diskTypes - StorageTypes you need in each node
+ * @param diskCount - How many disks you need of each type.
+ *
+ * @return Cluster
+ *
+ * @throws Exception
+ */
+ public DiskBalancerCluster createRandCluster(int dataNodeCount,
+ StorageType[] diskTypes,
+ int diskCount)
+
+ throws Exception {
+ Preconditions.checkState(diskTypes.length > 0);
+ Preconditions.checkState(diskCount > 0);
+ Preconditions.checkState(dataNodeCount > 0);
+ NullConnector nullConnector = new NullConnector();
+ DiskBalancerCluster cluster = new DiskBalancerCluster(nullConnector);
+
+ // once we add these nodes into the connector, cluster will read them
+ // from the connector.
+ for (int x = 0; x < dataNodeCount; x++) {
+ nullConnector.addNode(createRandomDataNode(diskTypes, diskCount));
+ }
+
+ // with this call we have populated the cluster info
+ cluster.readClusterInfo();
+ return cluster;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java
new file mode 100644
index 0000000..3507c96
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDataModels.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.UUID;
+
+public class TestDataModels {
+ @Test
+ public void TestCreateRandomVolume() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerVolume vol = util.createRandomVolume(StorageType.DISK);
+ Assert.assertNotNull(vol.getUuid());
+ Assert.assertNotNull(vol.getPath());
+ Assert.assertNotNull(vol.getStorageType());
+ Assert.assertFalse(vol.isFailed());
+ Assert.assertFalse(vol.isTransient());
+ Assert.assertTrue(vol.getCapacity() > 0);
+ Assert.assertTrue((vol.getCapacity() - vol.getReserved()) > 0);
+ Assert.assertTrue((vol.getReserved() + vol.getUsed()) < vol.getCapacity());
+ }
+
+ @Test
+ public void TestCreateRandomVolumeSet() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerVolumeSet vSet =
+ util.createRandomVolumeSet(StorageType.SSD, 10);
+ Assert.assertEquals(10, vSet.getVolumeCount());
+ Assert.assertEquals(StorageType.SSD.toString(),
+ vSet.getVolumes().get(0).getStorageType());
+
+ }
+
+ @Test
+ public void TestCreateRandomDataNode() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerDataNode node = util.createRandomDataNode(
+ new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 10);
+ Assert.assertNotNull(node.getNodeDataDensity());
+ }
+
+ @Test
+ public void TestDiskQueues() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerDataNode node = util.createRandomDataNode(
+ new StorageType[]{StorageType.DISK, StorageType.RAM_DISK}, 3);
+ TreeSet<DiskBalancerVolume> sortedQueue =
+ node.getVolumeSets().get(StorageType.DISK.toString()).getSortedQueue();
+
+ List<DiskBalancerVolume> reverseList = new LinkedList<>();
+ List<DiskBalancerVolume> highList = new LinkedList<>();
+ int queueSize = sortedQueue.size();
+ for (int x = 0; x < queueSize; x++) {
+ reverseList.add(sortedQueue.first());
+ highList.add(sortedQueue.first());
+ }
+ Collections.reverse(reverseList);
+
+ for (int x = 0; x < queueSize; x++) {
+
+ Assert.assertEquals(reverseList.get(x).getCapacity(),
+ highList.get(x).getCapacity());
+ Assert.assertEquals(reverseList.get(x).getReserved(),
+ highList.get(x).getReserved());
+ Assert.assertEquals(reverseList.get(x).getUsed(),
+ highList.get(x).getUsed());
+ }
+ }
+
+ @Test
+ public void TestNoBalancingNeededEvenDataSpread() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerDataNode node =
+ new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+ // create two disks which have exactly same data and isBalancing should
+ // say we don't need to balance.
+ DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
+ v1.setCapacity(DiskBalancerTestUtil.TB);
+ v1.setReserved(100 * DiskBalancerTestUtil.GB);
+ v1.setUsed(500 * DiskBalancerTestUtil.GB);
+
+ DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
+ v2.setCapacity(DiskBalancerTestUtil.TB);
+ v2.setReserved(100 * DiskBalancerTestUtil.GB);
+ v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+ node.addVolume(v1);
+ node.addVolume(v2);
+
+ for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+ Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
+ }
+ }
+
+ @Test
+ public void TestNoBalancingNeededTransientDisks() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerDataNode node =
+ new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+ // create two disks which have different data sizes, but
+ // transient. isBalancing should say no balancing needed.
+ DiskBalancerVolume v1 = util.createRandomVolume(StorageType.RAM_DISK);
+ v1.setCapacity(DiskBalancerTestUtil.TB);
+ v1.setReserved(100 * DiskBalancerTestUtil.GB);
+ v1.setUsed(1 * DiskBalancerTestUtil.GB);
+
+ DiskBalancerVolume v2 = util.createRandomVolume(StorageType.RAM_DISK);
+ v2.setCapacity(DiskBalancerTestUtil.TB);
+ v2.setReserved(100 * DiskBalancerTestUtil.GB);
+ v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+ node.addVolume(v1);
+ node.addVolume(v2);
+
+ for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+ Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
+ }
+ }
+
+ @Test
+ public void TestNoBalancingNeededFailedDisks() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerDataNode node =
+ new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+ // create two disks which have which are normal disks, but fail
+ // one of them. VolumeSet should say no balancing needed.
+ DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
+ v1.setCapacity(DiskBalancerTestUtil.TB);
+ v1.setReserved(100 * DiskBalancerTestUtil.GB);
+ v1.setUsed(1 * DiskBalancerTestUtil.GB);
+ v1.setFailed(true);
+
+ DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
+ v2.setCapacity(DiskBalancerTestUtil.TB);
+ v2.setReserved(100 * DiskBalancerTestUtil.GB);
+ v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+ node.addVolume(v1);
+ node.addVolume(v2);
+
+ for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+ Assert.assertFalse(vsets.isBalancingNeeded(10.0f));
+ }
+ }
+
+ @Test
+ public void TestNeedBalancingUnevenDataSpread() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerDataNode node =
+ new DiskBalancerDataNode(UUID.randomUUID().toString());
+
+ DiskBalancerVolume v1 = util.createRandomVolume(StorageType.SSD);
+ v1.setCapacity(DiskBalancerTestUtil.TB);
+ v1.setReserved(100 * DiskBalancerTestUtil.GB);
+ v1.setUsed(0);
+
+ DiskBalancerVolume v2 = util.createRandomVolume(StorageType.SSD);
+ v2.setCapacity(DiskBalancerTestUtil.TB);
+ v2.setReserved(100 * DiskBalancerTestUtil.GB);
+ v2.setUsed(500 * DiskBalancerTestUtil.GB);
+
+ node.addVolume(v1);
+ node.addVolume(v2);
+
+ for (DiskBalancerVolumeSet vsets : node.getVolumeSets().values()) {
+ Assert.assertTrue(vsets.isBalancingNeeded(10.0f));
+ }
+ }
+
+ @Test
+ public void TestVolumeSerialize() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+ DiskBalancerVolume volume = util.createRandomVolume(StorageType.DISK);
+ String originalString = volume.toJson();
+ DiskBalancerVolume parsedVolume =
+ DiskBalancerVolume.parseJson(originalString);
+ String parsedString = parsedVolume.toJson();
+ Assert.assertEquals(originalString, parsedString);
+ }
+
+ @Test
+ public void TestClusterSerialize() throws Exception {
+ DiskBalancerTestUtil util = new DiskBalancerTestUtil();
+
+ // Create a Cluster with 3 datanodes, 3 disk types and 3 disks in each type
+ // that is 9 disks in each machine.
+ DiskBalancerCluster cluster = util.createRandCluster(3, new StorageType[]{
+ StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD}, 3);
+
+ DiskBalancerCluster newCluster =
+ DiskBalancerCluster.parseJson(cluster.toJson());
+ Assert.assertEquals(cluster.getNodes(), newCluster.getNodes());
+ Assert
+ .assertEquals(cluster.getNodes().size(), newCluster.getNodes().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/91a5c481/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java
new file mode 100644
index 0000000..3f78530
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/connectors/NullConnector.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.server.diskbalancer.connectors;
+
+import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This connector allows user to create an in-memory cluster
+ * and is useful in testing.
+ */
+public class NullConnector implements ClusterConnector {
+ private final List<DiskBalancerDataNode> nodes = new LinkedList<>();
+
+ /**
+ * getNodes function returns a list of DiskBalancerDataNodes.
+ *
+ * @return Array of DiskBalancerDataNodes
+ */
+ @Override
+ public List<DiskBalancerDataNode> getNodes() throws Exception {
+ return nodes;
+ }
+
+ /**
+ * Returns info about the connector.
+ *
+ * @return String.
+ */
+ @Override
+ public String getConnectorInfo() {
+ return "Null Connector : No persistence, in-memory connector";
+ }
+
+ /**
+ * Allows user to add nodes into this connector.
+ *
+ * @param node - Node to add
+ */
+ public void addNode(DiskBalancerDataNode node) {
+ nodes.add(node);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org