You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/02/09 21:32:53 UTC

[1/2] hadoop git commit: HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 b1aad1d94 -> ff900eb64
  refs/heads/trunk 241336ca2 -> ab934e859


HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab934e85
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab934e85
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab934e85

Branch: refs/heads/trunk
Commit: ab934e85947dcf2092050023909dd81ae274ff45
Parents: 241336c
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Feb 9 12:17:40 2015 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Feb 9 12:17:40 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/net/NetworkTopology.java  |  2 +-
 .../net/NetworkTopologyWithNodeGroup.java       |  2 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/hdfs/protocol/LocatedBlock.java      | 77 ++++++++++++++----
 .../server/blockmanagement/DatanodeManager.java |  2 +
 .../protocol/DatanodeInfoWithStorage.java       | 59 ++++++++++++++
 .../apache/hadoop/hdfs/TestDecommission.java    | 10 ++-
 .../blockmanagement/TestDatanodeManager.java    | 84 ++++++++++++++++++++
 8 files changed, 218 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index aaa5ae3..fc8bf52 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -859,7 +859,7 @@ public class NetworkTopology {
     // Start off by initializing to off rack
     int weight = 2;
     if (reader != null) {
-      if (reader == node) {
+      if (reader.equals(node)) {
         weight = 0;
       } else if (isOnSameRack(reader, node)) {
         weight = 1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
index 13160eb..3de49dc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
@@ -254,7 +254,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
     // Start off by initializing to off rack
     int weight = 3;
     if (reader != null) {
-      if (reader == node) {
+      if (reader.equals(node)) {
         weight = 0;
       } else if (isOnSameNodeGroup(reader, node)) {
         weight = 1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index eda3744..4396e3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -872,6 +872,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and
     HdfsDataInputStream. (yliu)
 
+    HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos
+    but not StorageIDs. (Milan Desai via Arpit Agarwal)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 30368f6..7fb2e30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.Lists;
@@ -41,11 +42,13 @@ public class LocatedBlock {
 
   private final ExtendedBlock b;
   private long offset;  // offset of the first byte of the block in the file
-  private final DatanodeInfo[] locs;
-  /** Storage ID for each replica */
-  private final String[] storageIDs;
-  // Storage type for each replica, if reported.
-  private final StorageType[] storageTypes;
+  private final DatanodeInfoWithStorage[] locs;
+  private final boolean hasStorageIDs;
+  private final boolean hasStorageTypes;
+  /** Cached storage ID for each replica */
+  private String[] storageIDs;
+  /** Cached storage type for each replica, if reported. */
+  private StorageType[] storageTypes;
   // corrupt flag is true if all of the replicas of a block are corrupt.
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
@@ -57,7 +60,8 @@ public class LocatedBlock {
   private DatanodeInfo[] cachedLocs;
 
   // Used when there are no locations
-  private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
+  private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
+      new DatanodeInfoWithStorage[0];
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
     this(b, locs, -1, false); // startOffset is unknown
@@ -94,10 +98,22 @@ public class LocatedBlock {
     if (locs==null) {
       this.locs = EMPTY_LOCS;
     } else {
-      this.locs = locs;
+      this.locs = new DatanodeInfoWithStorage[locs.length];
+      for(int i = 0; i < locs.length; i++) {
+        DatanodeInfo di = locs[i];
+        DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
+            storageIDs != null ? storageIDs[i] : null,
+            storageTypes != null ? storageTypes[i] : null);
+        storage.setDependentHostNames(di.getDependentHostNames());
+        storage.setLevel(di.getLevel());
+        storage.setParent(di.getParent());
+        this.locs[i] = storage;
+      }
     }
     this.storageIDs = storageIDs;
     this.storageTypes = storageTypes;
+    this.hasStorageIDs = storageIDs != null;
+    this.hasStorageTypes = storageTypes != null;
 
     if (cachedLocs == null || cachedLocs.length == 0) {
       this.cachedLocs = EMPTY_LOCS;
@@ -118,18 +134,53 @@ public class LocatedBlock {
     return b;
   }
 
-  public DatanodeInfo[] getLocations() {
+  /**
+   * Returns the locations associated with this block. The returned array is not
+   * expected to be modified. If it is, caller must immediately invoke
+   * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo}
+   * to invalidate the cached Storage ID/Type arrays.
+   */
+  public DatanodeInfoWithStorage[] getLocations() {
     return locs;
   }
 
   public StorageType[] getStorageTypes() {
+    if(!hasStorageTypes) {
+      return null;
+    }
+    if(storageTypes != null) {
+      return storageTypes;
+    }
+    storageTypes = new StorageType[locs.length];
+    for(int i = 0; i < locs.length; i++) {
+      storageTypes[i] = locs[i].getStorageType();
+    }
     return storageTypes;
   }
   
   public String[] getStorageIDs() {
+    if(!hasStorageIDs) {
+      return null;
+    }
+    if(storageIDs != null) {
+      return storageIDs;
+    }
+    storageIDs = new String[locs.length];
+    for(int i = 0; i < locs.length; i++) {
+      storageIDs[i] = locs[i].getStorageID();
+    }
     return storageIDs;
   }
 
+  /**
+   * Invalidates the cached StorageID and StorageType information. Must be
+   * called when the locations array is modified.
+   */
+  public void invalidateCachedStorageInfo() {
+    storageIDs = null;
+    storageTypes = null;
+  }
+
   public long getStartOffset() {
     return offset;
   }
@@ -161,9 +212,9 @@ public class LocatedBlock {
       return;
     }
     // Try to re-use a DatanodeInfo already in loc
-    for (int i=0; i<locs.length; i++) {
-      if (locs[i].equals(loc)) {
-        cachedList.add(locs[i]);
+    for (DatanodeInfoWithStorage di : locs) {
+      if (loc.equals(di)) {
+        cachedList.add(di);
         cachedLocs = cachedList.toArray(cachedLocs);
         return;
       }
@@ -187,10 +238,6 @@ public class LocatedBlock {
         + "; corrupt=" + corrupt
         + "; offset=" + offset
         + "; locs=" + Arrays.asList(locs)
-        + "; storageIDs=" +
-            (storageIDs != null ? Arrays.asList(storageIDs) : null)
-        + "; storageTypes=" +
-            (storageTypes != null ? Arrays.asList(storageTypes) : null)
         + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index a33d990..c166e94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -391,6 +391,8 @@ public class DatanodeManager {
       }
       int activeLen = lastActiveIndex + 1;      
       networktopology.sortByDistance(client, b.getLocations(), activeLen);
+      // must invalidate cache since we modified locations array
+      b.invalidateCachedStorageInfo();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
new file mode 100644
index 0000000..ec8c346
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+public class DatanodeInfoWithStorage extends DatanodeInfo {
+  private final String storageID;
+  private final StorageType storageType;
+
+  public DatanodeInfoWithStorage(DatanodeInfo from, String storageID,
+                                 StorageType storageType) {
+    super(from);
+    this.storageID = storageID;
+    this.storageType = storageType;
+  }
+
+  public String getStorageID() {
+    return storageID;
+  }
+
+  public StorageType getStorageType() {
+    return storageType;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    // allows this class to be used interchangeably with DatanodeInfo
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    // allows this class to be used interchangeably with DatanodeInfo
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "DatanodeInfoWithStorage[" + super.toString() + "," + storageID +
+        "," + storageType + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index 24e5db6..35c0d8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -801,22 +802,23 @@ public class TestDecommission {
     
     ArrayList<String> nodes = new ArrayList<String>();
     ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
-   
+
+    DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
     for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
       DatanodeInfo found = datanodeInfo;
       for (DatanodeInfo dif: dnInfos4LastBlock) {
         if (datanodeInfo.equals(dif)) {
-         found = null;         
+         found = null;
         }
       }
       if (found != null) {
         nodes.add(found.getXferAddr());
-        dnInfos.add(found);
+        dnInfos.add(dm.getDatanode(found));
       }
     }
     //decommission one of the 3 nodes which have last block
     nodes.add(dnInfos4LastBlock[0].getXferAddr());
-    dnInfos.add(dnInfos4LastBlock[0]);
+    dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
     
     writeConfigFile(excludeFile, nodes);
     refreshNodes(ns, conf);  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 2c65fff..adf31a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -31,13 +32,19 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.*;
 
 public class TestDatanodeManager {
@@ -210,4 +217,81 @@ public class TestDatanodeManager {
     public void reloadCachedMappings(List<String> names) {  
     }
   }
+
+  /**
+   * This test creates a LocatedBlock with 5 locations, sorts the locations
+   * based on the network topology, and ensures the locations are still aligned
+   * with the storage ids and storage types.
+   */
+  @Test
+  public void testSortLocatedBlocks() throws IOException {
+    // create the DatanodeManager which will be tested
+    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+    DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
+        fsn, new Configuration());
+
+    // register 5 datanodes, each with different storage ID and type
+    DatanodeInfo[] locs = new DatanodeInfo[5];
+    String[] storageIDs = new String[5];
+    StorageType[] storageTypes = new StorageType[]{
+        StorageType.ARCHIVE,
+        StorageType.DEFAULT,
+        StorageType.DISK,
+        StorageType.RAM_DISK,
+        StorageType.SSD
+    };
+    for(int i = 0; i < 5; i++) {
+      // register new datanode
+      String uuid = "UUID-"+i;
+      String ip = "IP-" + i;
+      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+      Mockito.when(dr.getIpAddr()).thenReturn(ip);
+      Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
+      Mockito.when(dr.getXferPort()).thenReturn(9000);
+      Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
+      dm.registerDatanode(dr);
+
+      // get location and storage information
+      locs[i] = dm.getDatanode(uuid);
+      storageIDs[i] = "storageID-"+i;
+    }
+
+    // set first 2 locations as decomissioned
+    locs[0].setDecommissioned();
+    locs[1].setDecommissioned();
+
+    // create LocatedBlock with above locations
+    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
+    List<LocatedBlock> blocks = new ArrayList<>();
+    blocks.add(block);
+
+    final String targetIp = locs[4].getIpAddr();
+
+    // sort block locations
+    dm.sortLocatedBlocks(targetIp, blocks);
+
+    // check that storage IDs/types are aligned with datanode locs
+    DatanodeInfoWithStorage[] sortedLocs = block.getLocations();
+    storageIDs = block.getStorageIDs();
+    storageTypes = block.getStorageTypes();
+    assertThat(sortedLocs.length, is(5));
+    assertThat(storageIDs.length, is(5));
+    assertThat(storageTypes.length, is(5));
+    for(int i = 0; i < sortedLocs.length; i++) {
+      assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i]));
+      assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i]));
+    }
+
+    // Ensure the local node is first.
+    assertThat(sortedLocs[0].getIpAddr(), is(targetIp));
+
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[sortedLocs.length-1].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+  }
 }


[2/2] hadoop git commit: HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)

Posted by ar...@apache.org.
HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff900eb6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff900eb6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff900eb6

Branch: refs/heads/branch-2
Commit: ff900eb64a59e216af6073c769f9acdff8d4f812
Parents: b1aad1d
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Feb 9 12:17:40 2015 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Feb 9 12:17:53 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/net/NetworkTopology.java  |  2 +-
 .../net/NetworkTopologyWithNodeGroup.java       |  2 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/hdfs/protocol/LocatedBlock.java      | 77 ++++++++++++++----
 .../server/blockmanagement/DatanodeManager.java |  2 +
 .../protocol/DatanodeInfoWithStorage.java       | 59 ++++++++++++++
 .../apache/hadoop/hdfs/TestDecommission.java    | 10 ++-
 .../blockmanagement/TestDatanodeManager.java    | 84 ++++++++++++++++++++
 8 files changed, 218 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index a11ba9c..02b0005 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -860,7 +860,7 @@ public class NetworkTopology {
     // Start off by initializing to off rack
     int weight = 2;
     if (reader != null) {
-      if (reader == node) {
+      if (reader.equals(node)) {
         weight = 0;
       } else if (isOnSameRack(reader, node)) {
         weight = 1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
index 13160eb..3de49dc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
@@ -254,7 +254,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
     // Start off by initializing to off rack
     int weight = 3;
     if (reader != null) {
-      if (reader == node) {
+      if (reader.equals(node)) {
         weight = 0;
       } else if (isOnSameNodeGroup(reader, node)) {
         weight = 1;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 86a43e7..3278dd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -584,6 +584,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and
     HdfsDataInputStream. (yliu)
 
+    HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos
+    but not StorageIDs. (Milan Desai via Arpit Agarwal)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 30368f6..7fb2e30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.Lists;
@@ -41,11 +42,13 @@ public class LocatedBlock {
 
   private final ExtendedBlock b;
   private long offset;  // offset of the first byte of the block in the file
-  private final DatanodeInfo[] locs;
-  /** Storage ID for each replica */
-  private final String[] storageIDs;
-  // Storage type for each replica, if reported.
-  private final StorageType[] storageTypes;
+  private final DatanodeInfoWithStorage[] locs;
+  private final boolean hasStorageIDs;
+  private final boolean hasStorageTypes;
+  /** Cached storage ID for each replica */
+  private String[] storageIDs;
+  /** Cached storage type for each replica, if reported. */
+  private StorageType[] storageTypes;
   // corrupt flag is true if all of the replicas of a block are corrupt.
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
@@ -57,7 +60,8 @@ public class LocatedBlock {
   private DatanodeInfo[] cachedLocs;
 
   // Used when there are no locations
-  private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
+  private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
+      new DatanodeInfoWithStorage[0];
 
   public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
     this(b, locs, -1, false); // startOffset is unknown
@@ -94,10 +98,22 @@ public class LocatedBlock {
     if (locs==null) {
       this.locs = EMPTY_LOCS;
     } else {
-      this.locs = locs;
+      this.locs = new DatanodeInfoWithStorage[locs.length];
+      for(int i = 0; i < locs.length; i++) {
+        DatanodeInfo di = locs[i];
+        DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
+            storageIDs != null ? storageIDs[i] : null,
+            storageTypes != null ? storageTypes[i] : null);
+        storage.setDependentHostNames(di.getDependentHostNames());
+        storage.setLevel(di.getLevel());
+        storage.setParent(di.getParent());
+        this.locs[i] = storage;
+      }
     }
     this.storageIDs = storageIDs;
     this.storageTypes = storageTypes;
+    this.hasStorageIDs = storageIDs != null;
+    this.hasStorageTypes = storageTypes != null;
 
     if (cachedLocs == null || cachedLocs.length == 0) {
       this.cachedLocs = EMPTY_LOCS;
@@ -118,18 +134,53 @@ public class LocatedBlock {
     return b;
   }
 
-  public DatanodeInfo[] getLocations() {
+  /**
+   * Returns the locations associated with this block. The returned array is not
+   * expected to be modified. If it is, caller must immediately invoke
+   * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo}
+   * to invalidate the cached Storage ID/Type arrays.
+   */
+  public DatanodeInfoWithStorage[] getLocations() {
     return locs;
   }
 
   public StorageType[] getStorageTypes() {
+    if(!hasStorageTypes) {
+      return null;
+    }
+    if(storageTypes != null) {
+      return storageTypes;
+    }
+    storageTypes = new StorageType[locs.length];
+    for(int i = 0; i < locs.length; i++) {
+      storageTypes[i] = locs[i].getStorageType();
+    }
     return storageTypes;
   }
   
   public String[] getStorageIDs() {
+    if(!hasStorageIDs) {
+      return null;
+    }
+    if(storageIDs != null) {
+      return storageIDs;
+    }
+    storageIDs = new String[locs.length];
+    for(int i = 0; i < locs.length; i++) {
+      storageIDs[i] = locs[i].getStorageID();
+    }
     return storageIDs;
   }
 
+  /**
+   * Invalidates the cached StorageID and StorageType information. Must be
+   * called when the locations array is modified.
+   */
+  public void invalidateCachedStorageInfo() {
+    storageIDs = null;
+    storageTypes = null;
+  }
+
   public long getStartOffset() {
     return offset;
   }
@@ -161,9 +212,9 @@ public class LocatedBlock {
       return;
     }
     // Try to re-use a DatanodeInfo already in loc
-    for (int i=0; i<locs.length; i++) {
-      if (locs[i].equals(loc)) {
-        cachedList.add(locs[i]);
+    for (DatanodeInfoWithStorage di : locs) {
+      if (loc.equals(di)) {
+        cachedList.add(di);
         cachedLocs = cachedList.toArray(cachedLocs);
         return;
       }
@@ -187,10 +238,6 @@ public class LocatedBlock {
         + "; corrupt=" + corrupt
         + "; offset=" + offset
         + "; locs=" + Arrays.asList(locs)
-        + "; storageIDs=" +
-            (storageIDs != null ? Arrays.asList(storageIDs) : null)
-        + "; storageTypes=" +
-            (storageTypes != null ? Arrays.asList(storageTypes) : null)
         + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index a31123c..d251c72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -391,6 +391,8 @@ public class DatanodeManager {
       }
       int activeLen = lastActiveIndex + 1;      
       networktopology.sortByDistance(client, b.getLocations(), activeLen);
+      // must invalidate cache since we modified locations array
+      b.invalidateCachedStorageInfo();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
new file mode 100644
index 0000000..ec8c346
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+public class DatanodeInfoWithStorage extends DatanodeInfo {
+  private final String storageID;
+  private final StorageType storageType;
+
+  public DatanodeInfoWithStorage(DatanodeInfo from, String storageID,
+                                 StorageType storageType) {
+    super(from);
+    this.storageID = storageID;
+    this.storageType = storageType;
+  }
+
+  public String getStorageID() {
+    return storageID;
+  }
+
+  public StorageType getStorageType() {
+    return storageType;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    // allows this class to be used interchangeably with DatanodeInfo
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    // allows this class to be used interchangeably with DatanodeInfo
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "DatanodeInfoWithStorage[" + super.toString() + "," + storageID +
+        "," + storageType + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index 20c3c01..0940590 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -800,22 +801,23 @@ public class TestDecommission {
     
     ArrayList<String> nodes = new ArrayList<String>();
     ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
-   
+
+    DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
     for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
       DatanodeInfo found = datanodeInfo;
       for (DatanodeInfo dif: dnInfos4LastBlock) {
         if (datanodeInfo.equals(dif)) {
-         found = null;         
+         found = null;
         }
       }
       if (found != null) {
         nodes.add(found.getXferAddr());
-        dnInfos.add(found);
+        dnInfos.add(dm.getDatanode(found));
       }
     }
     //decommission one of the 3 nodes which have last block
     nodes.add(dnInfos4LastBlock[0].getXferAddr());
-    dnInfos.add(dnInfos4LastBlock[0]);
+    dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
     
     writeConfigFile(excludeFile, nodes);
     refreshNodes(ns, conf);  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff900eb6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 2c65fff..adf31a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -31,13 +32,19 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.*;
 
 public class TestDatanodeManager {
@@ -210,4 +217,81 @@ public class TestDatanodeManager {
     public void reloadCachedMappings(List<String> names) {  
     }
   }
+
+  /**
+   * This test creates a LocatedBlock with 5 locations, sorts the locations
+   * based on the network topology, and ensures the locations are still aligned
+   * with the storage ids and storage types.
+   */
+  @Test
+  public void testSortLocatedBlocks() throws IOException {
+    // create the DatanodeManager which will be tested
+    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+    DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
+        fsn, new Configuration());
+
+    // register 5 datanodes, each with different storage ID and type
+    DatanodeInfo[] locs = new DatanodeInfo[5];
+    String[] storageIDs = new String[5];
+    StorageType[] storageTypes = new StorageType[]{
+        StorageType.ARCHIVE,
+        StorageType.DEFAULT,
+        StorageType.DISK,
+        StorageType.RAM_DISK,
+        StorageType.SSD
+    };
+    for(int i = 0; i < 5; i++) {
+      // register new datanode
+      String uuid = "UUID-"+i;
+      String ip = "IP-" + i;
+      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+      Mockito.when(dr.getIpAddr()).thenReturn(ip);
+      Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
+      Mockito.when(dr.getXferPort()).thenReturn(9000);
+      Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
+      dm.registerDatanode(dr);
+
+      // get location and storage information
+      locs[i] = dm.getDatanode(uuid);
+      storageIDs[i] = "storageID-"+i;
+    }
+
+    // set first 2 locations as decomissioned
+    locs[0].setDecommissioned();
+    locs[1].setDecommissioned();
+
+    // create LocatedBlock with above locations
+    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+    LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
+    List<LocatedBlock> blocks = new ArrayList<>();
+    blocks.add(block);
+
+    final String targetIp = locs[4].getIpAddr();
+
+    // sort block locations
+    dm.sortLocatedBlocks(targetIp, blocks);
+
+    // check that storage IDs/types are aligned with datanode locs
+    DatanodeInfoWithStorage[] sortedLocs = block.getLocations();
+    storageIDs = block.getStorageIDs();
+    storageTypes = block.getStorageTypes();
+    assertThat(sortedLocs.length, is(5));
+    assertThat(storageIDs.length, is(5));
+    assertThat(storageTypes.length, is(5));
+    for(int i = 0; i < sortedLocs.length; i++) {
+      assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i]));
+      assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i]));
+    }
+
+    // Ensure the local node is first.
+    assertThat(sortedLocs[0].getIpAddr(), is(targetIp));
+
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[sortedLocs.length-1].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+  }
 }