You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2017/12/02 02:38:56 UTC

[43/50] [abbrv] hadoop git commit: HDFS-12809. [READ] Fix the randomized selection of locations in {{ProvidedBlocksBuilder}}.

HDFS-12809. [READ] Fix the randomized selection of locations in {{ProvidedBlocksBuilder}}.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a3ab228
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a3ab228
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a3ab228

Branch: refs/heads/HDFS-9806
Commit: 6a3ab2282025b90c5e14898796b5a20725b54cfd
Parents: 1151f04
Author: Virajith Jalaparti <vi...@apache.org>
Authored: Mon Nov 27 17:04:20 2017 -0800
Committer: Virajith Jalaparti <vi...@apache.org>
Committed: Fri Dec 1 18:16:59 2017 -0800

----------------------------------------------------------------------
 .../blockmanagement/ProvidedStorageMap.java     | 112 +++++++------------
 .../TestNameNodeProvidedImplementation.java     |  26 ++++-
 2 files changed, 61 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3ab228/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
index 6fec977..c85eb2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ProvidedStorageMap.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -229,11 +230,8 @@ public class ProvidedStorageMap {
         sids.add(currInfo.getStorageID());
         types.add(storageType);
         if (StorageType.PROVIDED.equals(storageType)) {
-          DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
-          locs.add(
-              new DatanodeInfoWithStorage(
-                  dn, currInfo.getStorageID(), currInfo.getStorageType()));
-          excludedUUids.add(dn.getDatanodeUuid());
+          // Provided location will be added to the list of locations after
+          // examining all local locations.
           isProvidedBlock = true;
         } else {
           locs.add(new DatanodeInfoWithStorage(
@@ -245,11 +243,17 @@ public class ProvidedStorageMap {
 
       int numLocations = locs.size();
       if (isProvidedBlock) {
+        // add the first datanode here
+        DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
+        locs.add(
+            new DatanodeInfoWithStorage(dn, storageId, StorageType.PROVIDED));
+        excludedUUids.add(dn.getDatanodeUuid());
+        numLocations++;
         // add more replicas until we reach the defaultReplication
         for (int count = numLocations + 1;
             count <= defaultReplication && count <= providedDescriptor
                 .activeProvidedDatanodes(); count++) {
-          DatanodeDescriptor dn = chooseProvidedDatanode(excludedUUids);
+          dn = chooseProvidedDatanode(excludedUUids);
           locs.add(new DatanodeInfoWithStorage(
               dn, storageId, StorageType.PROVIDED));
           sids.add(storageId);
@@ -284,6 +288,9 @@ public class ProvidedStorageMap {
 
     private final NavigableMap<String, DatanodeDescriptor> dns =
         new ConcurrentSkipListMap<>();
+    // maintain a separate list of the datanodes with provided storage
+    // to efficiently choose Datanodes when required.
+    private final List<DatanodeDescriptor> dnR = new ArrayList<>();
     public final static String NETWORK_LOCATION = "/REMOTE";
     public final static String NAME = "PROVIDED";
 
@@ -300,8 +307,8 @@ public class ProvidedStorageMap {
 
     DatanodeStorageInfo getProvidedStorage(
         DatanodeDescriptor dn, DatanodeStorage s) {
-      LOG.info("XXXXX adding Datanode " + dn.getDatanodeUuid());
       dns.put(dn.getDatanodeUuid(), dn);
+      dnR.add(dn);
       // TODO: maintain separate RPC ident per dn
       return storageMap.get(s.getStorageID());
     }
@@ -315,84 +322,42 @@ public class ProvidedStorageMap {
     }
 
     DatanodeDescriptor choose(DatanodeDescriptor client) {
-      // exact match for now
-      DatanodeDescriptor dn = client != null ?
-          dns.get(client.getDatanodeUuid()) : null;
-      if (null == dn) {
-        dn = chooseRandom();
-      }
-      return dn;
+      return choose(client, Collections.<String>emptySet());
     }
 
     DatanodeDescriptor choose(DatanodeDescriptor client,
         Set<String> excludedUUids) {
       // exact match for now
-      DatanodeDescriptor dn = client != null ?
-          dns.get(client.getDatanodeUuid()) : null;
-
-      if (null == dn || excludedUUids.contains(client.getDatanodeUuid())) {
-        dn = null;
-        Set<String> exploredUUids = new HashSet<String>();
-
-        while(exploredUUids.size() < dns.size()) {
-          Map.Entry<String, DatanodeDescriptor> d =
-                  dns.ceilingEntry(UUID.randomUUID().toString());
-          if (null == d) {
-            d = dns.firstEntry();
-          }
-          String uuid = d.getValue().getDatanodeUuid();
-          //this node has already been explored, and was not selected earlier
-          if (exploredUUids.contains(uuid)) {
-            continue;
-          }
-          exploredUUids.add(uuid);
-          //this node has been excluded
-          if (excludedUUids.contains(uuid)) {
-            continue;
-          }
-          return dns.get(uuid);
-        }
-      }
-
-      return dn;
-    }
-
-    DatanodeDescriptor chooseRandom(DatanodeStorageInfo[] excludedStorages) {
-      // TODO: Currently this is not uniformly random;
-      // skewed toward sparse sections of the ids
-      Set<DatanodeDescriptor> excludedNodes =
-          new HashSet<DatanodeDescriptor>();
-      if (excludedStorages != null) {
-        for (int i= 0; i < excludedStorages.length; i++) {
-          LOG.info("Excluded: " + excludedStorages[i].getDatanodeDescriptor());
-          excludedNodes.add(excludedStorages[i].getDatanodeDescriptor());
+      if (client != null && !excludedUUids.contains(client.getDatanodeUuid())) {
+        DatanodeDescriptor dn = dns.get(client.getDatanodeUuid());
+        if (dn != null) {
+          return dn;
         }
       }
-      Set<DatanodeDescriptor> exploredNodes = new HashSet<DatanodeDescriptor>();
 
-      while(exploredNodes.size() < dns.size()) {
-        Map.Entry<String, DatanodeDescriptor> d =
-            dns.ceilingEntry(UUID.randomUUID().toString());
-        if (null == d) {
-          d = dns.firstEntry();
-        }
-        DatanodeDescriptor node = d.getValue();
-        //this node has already been explored, and was not selected earlier
-        if (exploredNodes.contains(node)) {
-          continue;
+      Random r = new Random();
+      for (int i = dnR.size() - 1; i >= 0; --i) {
+        int pos = r.nextInt(i + 1);
+        DatanodeDescriptor node = dnR.get(pos);
+        String uuid = node.getDatanodeUuid();
+        if (!excludedUUids.contains(uuid)) {
+          return node;
         }
-        exploredNodes.add(node);
-        //this node has been excluded
-        if (excludedNodes.contains(node)) {
-          continue;
-        }
-        return node;
+        Collections.swap(dnR, i, pos);
       }
       return null;
     }
 
-    DatanodeDescriptor chooseRandom() {
-      return chooseRandom(null);
+    DatanodeDescriptor chooseRandom(DatanodeStorageInfo... excludedStorages) {
+      Set<String> excludedNodes = new HashSet<>();
+      if (excludedStorages != null) {
+        for (int i = 0; i < excludedStorages.length; i++) {
+          DatanodeDescriptor dn = excludedStorages[i].getDatanodeDescriptor();
+          String uuid = dn.getDatanodeUuid();
+          excludedNodes.add(uuid);
+        }
+      }
+      return choose(null, excludedNodes);
     }
 
     @Override
@@ -414,6 +379,7 @@ public class ProvidedStorageMap {
         DatanodeDescriptor storedDN = dns.get(dnToRemove.getDatanodeUuid());
         if (storedDN != null) {
           dns.remove(dnToRemove.getDatanodeUuid());
+          dnR.remove(dnToRemove);
         }
       }
       return dns.size();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3ab228/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
index 9c82967..09e8f97 100644
--- a/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
+++ b/hadoop-tools/hadoop-fs2img/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeProvidedImplementation.java
@@ -27,8 +27,11 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
+import java.util.Set;
+
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -480,16 +483,31 @@ public class TestNameNodeProvidedImplementation {
     // given the start and length in the above call,
     // only one LocatedBlock in LocatedBlocks
     assertEquals(expectedBlocks, locatedBlocks.getLocatedBlocks().size());
-    LocatedBlock locatedBlock = locatedBlocks.getLocatedBlocks().get(0);
-    assertEquals(expectedLocations, locatedBlock.getLocations().length);
-    return locatedBlock.getLocations();
+    DatanodeInfo[] locations =
+        locatedBlocks.getLocatedBlocks().get(0).getLocations();
+    assertEquals(expectedLocations, locations.length);
+    checkUniqueness(locations);
+    return locations;
+  }
+
+  /**
+   * verify that the given locations are all unique.
+   * @param locations
+   */
+  private void checkUniqueness(DatanodeInfo[] locations) {
+    Set<String> set = new HashSet<>();
+    for (DatanodeInfo info: locations) {
+      assertFalse("All locations should be unique",
+          set.contains(info.getDatanodeUuid()));
+      set.add(info.getDatanodeUuid());
+    }
   }
 
   /**
    * Tests setting replication of provided files.
    * @throws Exception
    */
-  @Test(timeout=30000)
+  @Test(timeout=50000)
   public void testSetReplicationForProvidedFiles() throws Exception {
     createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
         FixedBlockResolver.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org