You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/02/17 20:30:18 UTC

hadoop git commit: HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages. Contributed by Wei Zhou.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 77f7ca3e9 -> 3a23dc683


HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages. Contributed by Wei Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a23dc68
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a23dc68
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a23dc68

Branch: refs/heads/trunk
Commit: 3a23dc683c058d3a5262ae9dca2d1c8c588a6a3e
Parents: 77f7ca3
Author: Andrew Wang <wa...@apache.org>
Authored: Wed Feb 17 11:29:10 2016 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Wed Feb 17 11:29:58 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../AvailableSpaceVolumeChoosingPolicy.java     | 35 ++++++++++--
 .../RoundRobinVolumeChoosingPolicy.java         | 53 +++++++++++++-----
 .../TestRoundRobinVolumeChoosingPolicy.java     | 56 ++++++++++++++++++++
 4 files changed, 131 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a23dc68/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5ca7f8a..36bb60d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1021,6 +1021,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-9691. TestBlockManagerSafeMode#testCheckSafeMode fails intermittently.
     (Mingliang Liu via aajisaka)
 
+    HDFS-9608. Disk IO imbalance in HDFS with heterogeneous storages.
+    (Wei Zhou via wang)
+
 Release 2.8.0 - UNRELEASED
 
   NEW FEATURES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a23dc68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index d0d36ba..39d9547 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
@@ -39,11 +40,15 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
  * new replica allocation. By default this policy prefers assigning replicas to
  * those volumes with more available free space, so as to over time balance the
  * available space of all the volumes within a DN.
+ * Use fine-grained locks to enable choosing volumes of different storage
+ * types concurrently.
  */
 public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
     implements VolumeChoosingPolicy<V>, Configurable {
   
   private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
+
+  private Object[] syncLocks;
   
   private final Random random;
   
@@ -52,14 +57,24 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
 
   AvailableSpaceVolumeChoosingPolicy(Random random) {
     this.random = random;
+    initLocks();
   }
 
   public AvailableSpaceVolumeChoosingPolicy() {
     this(new Random());
+    initLocks();
+  }
+
+  private void initLocks() {
+    int numStorageTypes = StorageType.values().length;
+    syncLocks = new Object[numStorageTypes];
+    for (int i = 0; i < numStorageTypes; i++) {
+      syncLocks[i] = new Object();
+    }
   }
 
   @Override
-  public synchronized void setConf(Configuration conf) {
+  public void setConf(Configuration conf) {
     balancedSpaceThreshold = conf.getLong(
         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_KEY,
         DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT);
@@ -85,7 +100,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
   }
   
   @Override
-  public synchronized Configuration getConf() {
+  public Configuration getConf() {
     // Nothing to do. Only added to fulfill the Configurable contract.
     return null;
   }
@@ -98,12 +113,24 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
       new RoundRobinVolumeChoosingPolicy<V>();
 
   @Override
-  public synchronized V chooseVolume(List<V> volumes,
+  public V chooseVolume(List<V> volumes,
       long replicaSize) throws IOException {
     if (volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
-    
+    // As all the items in volumes are with the same storage type,
+    // so only need to get the storage type index of the first item in volumes
+    StorageType storageType = volumes.get(0).getStorageType();
+    int index = storageType != null ?
+            storageType.ordinal() : StorageType.DEFAULT.ordinal();
+
+    synchronized (syncLocks[index]) {
+      return doChooseVolume(volumes, replicaSize);
+    }
+  }
+
+  private V doChooseVolume(final List<V> volumes,
+                         long replicaSize) throws IOException {
     AvailableSpaceVolumeList volumesWithSpaces =
         new AvailableSpaceVolumeList(volumes);
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a23dc68/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 19452d8..9474b92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -22,46 +22,75 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
- * Choose volumes in round-robin order.
+ * Choose volumes with the same storage type in round-robin order.
+ * Use fine-grained locks to synchronize volume choosing.
  */
 public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
     implements VolumeChoosingPolicy<V> {
   public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
 
-  private int curVolume = 0;
+  // curVolumes stores the RR counters of each storage type.
+  // The ordinal of storage type in org.apache.hadoop.fs.StorageType
+  // is used as the index to get data from the array.
+  private int[] curVolumes;
+  // syncLocks stores the locks for each storage type.
+  private Object[] syncLocks;
+
+  public RoundRobinVolumeChoosingPolicy() {
+    int numStorageTypes = StorageType.values().length;
+    curVolumes = new int[numStorageTypes];
+    syncLocks = new Object[numStorageTypes];
+    for (int i = 0; i < numStorageTypes; i++) {
+      syncLocks[i] = new Object();
+    }
+  }
 
   @Override
-  public synchronized V chooseVolume(final List<V> volumes, long blockSize)
+  public V chooseVolume(final List<V> volumes, long blockSize)
       throws IOException {
 
-    if(volumes.size() < 1) {
+    if (volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
-    
+
+    // As all the items in volumes are with the same storage type,
+    // so only need to get the storage type index of the first item in volumes
+    StorageType storageType = volumes.get(0).getStorageType();
+    int index = storageType != null ?
+            storageType.ordinal() : StorageType.DEFAULT.ordinal();
+
+    synchronized (syncLocks[index]) {
+      return chooseVolume(index, volumes, blockSize);
+    }
+  }
+
+  private V chooseVolume(final int curVolumeIndex, final List<V> volumes,
+                         long blockSize) throws IOException {
     // since volumes could've been removed because of the failure
     // make sure we are not out of bounds
-    if(curVolume >= volumes.size()) {
-      curVolume = 0;
-    }
-    
+    int curVolume = curVolumes[curVolumeIndex] < volumes.size()
+            ? curVolumes[curVolumeIndex] : 0;
+
     int startVolume = curVolume;
     long maxAvailable = 0;
-    
+
     while (true) {
       final V volume = volumes.get(curVolume);
       curVolume = (curVolume + 1) % volumes.size();
       long availableVolumeSize = volume.getAvailable();
       if (availableVolumeSize > blockSize) {
+        curVolumes[curVolumeIndex] = curVolume;
         return volume;
       }
-      
+
       if (availableVolumeSize > maxAvailable) {
         maxAvailable = availableVolumeSize;
       }
-      
+
       if (curVolume == startVolume) {
         throw new DiskOutOfSpaceException("Out of space: "
             + "The volume with the most available space (=" + maxAvailable

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a23dc68/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
index 9818a01..9b3047f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.junit.Assert;
@@ -102,4 +103,59 @@ public class TestRoundRobinVolumeChoosingPolicy {
     }
   }
 
+  // Test Round-Robin choosing algorithm with heterogeneous storage.
+  @Test
+  public void testRRPolicyWithStorageTypes() throws Exception {
+    final RoundRobinVolumeChoosingPolicy<FsVolumeSpi> policy
+            = new RoundRobinVolumeChoosingPolicy<FsVolumeSpi>();
+    testRRPolicyWithStorageTypes(policy);
+  }
+
+  public static void testRRPolicyWithStorageTypes(
+      VolumeChoosingPolicy<FsVolumeSpi> policy) throws Exception {
+    final List<FsVolumeSpi> diskVolumes = new ArrayList<FsVolumeSpi>();
+    final List<FsVolumeSpi> ssdVolumes = new ArrayList<FsVolumeSpi>();
+
+    // Add two DISK volumes to diskVolumes
+    diskVolumes.add(Mockito.mock(FsVolumeSpi.class));
+    Mockito.when(diskVolumes.get(0).getStorageType())
+            .thenReturn(StorageType.DISK);
+    Mockito.when(diskVolumes.get(0).getAvailable()).thenReturn(100L);
+    diskVolumes.add(Mockito.mock(FsVolumeSpi.class));
+    Mockito.when(diskVolumes.get(1).getStorageType())
+            .thenReturn(StorageType.DISK);
+    Mockito.when(diskVolumes.get(1).getAvailable()).thenReturn(100L);
+
+    // Add two SSD volumes to ssdVolumes
+    ssdVolumes.add(Mockito.mock(FsVolumeSpi.class));
+    Mockito.when(ssdVolumes.get(0).getStorageType())
+            .thenReturn(StorageType.SSD);
+    Mockito.when(ssdVolumes.get(0).getAvailable()).thenReturn(200L);
+    ssdVolumes.add(Mockito.mock(FsVolumeSpi.class));
+    Mockito.when(ssdVolumes.get(1).getStorageType())
+            .thenReturn(StorageType.SSD);
+    Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L);
+
+    Assert.assertEquals(diskVolumes.get(0),
+            policy.chooseVolume(diskVolumes, 0));
+    // Independent Round-Robin for different storage type
+    Assert.assertEquals(ssdVolumes.get(0),
+            policy.chooseVolume(ssdVolumes, 0));
+    // Take block size into consideration
+    Assert.assertEquals(ssdVolumes.get(0),
+            policy.chooseVolume(ssdVolumes, 150L));
+
+    Assert.assertEquals(diskVolumes.get(1),
+            policy.chooseVolume(diskVolumes, 0));
+    Assert.assertEquals(diskVolumes.get(0),
+            policy.chooseVolume(diskVolumes, 50L));
+
+    try {
+      policy.chooseVolume(diskVolumes, 200L);
+      Assert.fail("Should throw an DiskOutOfSpaceException before this!");
+    } catch (DiskOutOfSpaceException e) {
+      // Pass.
+    }
+  }
+
 }