You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/02/11 20:48:21 UTC
[03/50] [abbrv] hadoop git commit: HDFS-7647.
DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs.
(Contributed by Milan Desai)
HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos but not StorageIDs. (Contributed by Milan Desai)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab934e85
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab934e85
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab934e85
Branch: refs/heads/YARN-2928
Commit: ab934e85947dcf2092050023909dd81ae274ff45
Parents: 241336c
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Feb 9 12:17:40 2015 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Feb 9 12:17:40 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/net/NetworkTopology.java | 2 +-
.../net/NetworkTopologyWithNodeGroup.java | 2 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/protocol/LocatedBlock.java | 77 ++++++++++++++----
.../server/blockmanagement/DatanodeManager.java | 2 +
.../protocol/DatanodeInfoWithStorage.java | 59 ++++++++++++++
.../apache/hadoop/hdfs/TestDecommission.java | 10 ++-
.../blockmanagement/TestDatanodeManager.java | 84 ++++++++++++++++++++
8 files changed, 218 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index aaa5ae3..fc8bf52 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -859,7 +859,7 @@ public class NetworkTopology {
// Start off by initializing to off rack
int weight = 2;
if (reader != null) {
- if (reader == node) {
+ if (reader.equals(node)) {
weight = 0;
} else if (isOnSameRack(reader, node)) {
weight = 1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
index 13160eb..3de49dc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
@@ -254,7 +254,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
// Start off by initializing to off rack
int weight = 3;
if (reader != null) {
- if (reader == node) {
+ if (reader.equals(node)) {
weight = 0;
} else if (isOnSameNodeGroup(reader, node)) {
weight = 1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index eda3744..4396e3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -872,6 +872,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7741. Remove unnecessary synchronized in FSDataInputStream and
HdfsDataInputStream. (yliu)
+ HDFS-7647. DatanodeManager.sortLocatedBlocks sorts DatanodeInfos
+ but not StorageIDs. (Milan Desai via Arpit Agarwal)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 30368f6..7fb2e30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
@@ -41,11 +42,13 @@ public class LocatedBlock {
private final ExtendedBlock b;
private long offset; // offset of the first byte of the block in the file
- private final DatanodeInfo[] locs;
- /** Storage ID for each replica */
- private final String[] storageIDs;
- // Storage type for each replica, if reported.
- private final StorageType[] storageTypes;
+ private final DatanodeInfoWithStorage[] locs;
+ private final boolean hasStorageIDs;
+ private final boolean hasStorageTypes;
+ /** Cached storage ID for each replica */
+ private String[] storageIDs;
+ /** Cached storage type for each replica, if reported. */
+ private StorageType[] storageTypes;
// corrupt flag is true if all of the replicas of a block are corrupt.
// else false. If block has few corrupt replicas, they are filtered and
// their locations are not part of this object
@@ -57,7 +60,8 @@ public class LocatedBlock {
private DatanodeInfo[] cachedLocs;
// Used when there are no locations
- private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
+ private static final DatanodeInfoWithStorage[] EMPTY_LOCS =
+ new DatanodeInfoWithStorage[0];
public LocatedBlock(ExtendedBlock b, DatanodeInfo[] locs) {
this(b, locs, -1, false); // startOffset is unknown
@@ -94,10 +98,22 @@ public class LocatedBlock {
if (locs==null) {
this.locs = EMPTY_LOCS;
} else {
- this.locs = locs;
+ this.locs = new DatanodeInfoWithStorage[locs.length];
+ for(int i = 0; i < locs.length; i++) {
+ DatanodeInfo di = locs[i];
+ DatanodeInfoWithStorage storage = new DatanodeInfoWithStorage(di,
+ storageIDs != null ? storageIDs[i] : null,
+ storageTypes != null ? storageTypes[i] : null);
+ storage.setDependentHostNames(di.getDependentHostNames());
+ storage.setLevel(di.getLevel());
+ storage.setParent(di.getParent());
+ this.locs[i] = storage;
+ }
}
this.storageIDs = storageIDs;
this.storageTypes = storageTypes;
+ this.hasStorageIDs = storageIDs != null;
+ this.hasStorageTypes = storageTypes != null;
if (cachedLocs == null || cachedLocs.length == 0) {
this.cachedLocs = EMPTY_LOCS;
@@ -118,18 +134,53 @@ public class LocatedBlock {
return b;
}
- public DatanodeInfo[] getLocations() {
+ /**
+ * Returns the locations associated with this block. The returned array is not
+ * expected to be modified. If it is, caller must immediately invoke
+ * {@link org.apache.hadoop.hdfs.protocol.LocatedBlock#invalidateCachedStorageInfo}
+ * to invalidate the cached Storage ID/Type arrays.
+ */
+ public DatanodeInfoWithStorage[] getLocations() {
return locs;
}
public StorageType[] getStorageTypes() {
+ if(!hasStorageTypes) {
+ return null;
+ }
+ if(storageTypes != null) {
+ return storageTypes;
+ }
+ storageTypes = new StorageType[locs.length];
+ for(int i = 0; i < locs.length; i++) {
+ storageTypes[i] = locs[i].getStorageType();
+ }
return storageTypes;
}
public String[] getStorageIDs() {
+ if(!hasStorageIDs) {
+ return null;
+ }
+ if(storageIDs != null) {
+ return storageIDs;
+ }
+ storageIDs = new String[locs.length];
+ for(int i = 0; i < locs.length; i++) {
+ storageIDs[i] = locs[i].getStorageID();
+ }
return storageIDs;
}
+ /**
+ * Invalidates the cached StorageID and StorageType information. Must be
+ * called when the locations array is modified.
+ */
+ public void invalidateCachedStorageInfo() {
+ storageIDs = null;
+ storageTypes = null;
+ }
+
public long getStartOffset() {
return offset;
}
@@ -161,9 +212,9 @@ public class LocatedBlock {
return;
}
// Try to re-use a DatanodeInfo already in loc
- for (int i=0; i<locs.length; i++) {
- if (locs[i].equals(loc)) {
- cachedList.add(locs[i]);
+ for (DatanodeInfoWithStorage di : locs) {
+ if (loc.equals(di)) {
+ cachedList.add(di);
cachedLocs = cachedList.toArray(cachedLocs);
return;
}
@@ -187,10 +238,6 @@ public class LocatedBlock {
+ "; corrupt=" + corrupt
+ "; offset=" + offset
+ "; locs=" + Arrays.asList(locs)
- + "; storageIDs=" +
- (storageIDs != null ? Arrays.asList(storageIDs) : null)
- + "; storageTypes=" +
- (storageTypes != null ? Arrays.asList(storageTypes) : null)
+ "}";
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index a33d990..c166e94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -391,6 +391,8 @@ public class DatanodeManager {
}
int activeLen = lastActiveIndex + 1;
networktopology.sortByDistance(client, b.getLocations(), activeLen);
+ // must invalidate cache since we modified locations array
+ b.invalidateCachedStorageInfo();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
new file mode 100644
index 0000000..ec8c346
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeInfoWithStorage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+public class DatanodeInfoWithStorage extends DatanodeInfo {
+ private final String storageID;
+ private final StorageType storageType;
+
+ public DatanodeInfoWithStorage(DatanodeInfo from, String storageID,
+ StorageType storageType) {
+ super(from);
+ this.storageID = storageID;
+ this.storageType = storageType;
+ }
+
+ public String getStorageID() {
+ return storageID;
+ }
+
+ public StorageType getStorageType() {
+ return storageType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ // allows this class to be used interchangeably with DatanodeInfo
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ // allows this class to be used interchangeably with DatanodeInfo
+ return super.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "DatanodeInfoWithStorage[" + super.toString() + "," + storageID +
+ "," + storageType + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index 24e5db6..35c0d8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@@ -801,22 +802,23 @@ public class TestDecommission {
ArrayList<String> nodes = new ArrayList<String>();
ArrayList<DatanodeInfo> dnInfos = new ArrayList<DatanodeInfo>();
-
+
+ DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
for (DatanodeInfo datanodeInfo : dnInfos4FirstBlock) {
DatanodeInfo found = datanodeInfo;
for (DatanodeInfo dif: dnInfos4LastBlock) {
if (datanodeInfo.equals(dif)) {
- found = null;
+ found = null;
}
}
if (found != null) {
nodes.add(found.getXferAddr());
- dnInfos.add(found);
+ dnInfos.add(dm.getDatanode(found));
}
}
//decommission one of the 3 nodes which have last block
nodes.add(dnInfos4LastBlock[0].getXferAddr());
- dnInfos.add(dnInfos4LastBlock[0]);
+ dnInfos.add(dm.getDatanode(dnInfos4LastBlock[0]));
writeConfigFile(excludeFile, nodes);
refreshNodes(ns, conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab934e85/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 2c65fff..adf31a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -31,13 +32,19 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;
public class TestDatanodeManager {
@@ -210,4 +217,81 @@ public class TestDatanodeManager {
public void reloadCachedMappings(List<String> names) {
}
}
+
+ /**
+ * This test creates a LocatedBlock with 5 locations, sorts the locations
+ * based on the network topology, and ensures the locations are still aligned
+ * with the storage ids and storage types.
+ */
+ @Test
+ public void testSortLocatedBlocks() throws IOException {
+ // create the DatanodeManager which will be tested
+ FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+ Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+ DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
+ fsn, new Configuration());
+
+ // register 5 datanodes, each with different storage ID and type
+ DatanodeInfo[] locs = new DatanodeInfo[5];
+ String[] storageIDs = new String[5];
+ StorageType[] storageTypes = new StorageType[]{
+ StorageType.ARCHIVE,
+ StorageType.DEFAULT,
+ StorageType.DISK,
+ StorageType.RAM_DISK,
+ StorageType.SSD
+ };
+ for(int i = 0; i < 5; i++) {
+ // register new datanode
+ String uuid = "UUID-"+i;
+ String ip = "IP-" + i;
+ DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+ Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+ Mockito.when(dr.getIpAddr()).thenReturn(ip);
+ Mockito.when(dr.getXferAddr()).thenReturn(ip + ":9000");
+ Mockito.when(dr.getXferPort()).thenReturn(9000);
+ Mockito.when(dr.getSoftwareVersion()).thenReturn("version1");
+ dm.registerDatanode(dr);
+
+ // get location and storage information
+ locs[i] = dm.getDatanode(uuid);
+ storageIDs[i] = "storageID-"+i;
+ }
+
+ // set first 2 locations as decomissioned
+ locs[0].setDecommissioned();
+ locs[1].setDecommissioned();
+
+ // create LocatedBlock with above locations
+ ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+ LocatedBlock block = new LocatedBlock(b, locs, storageIDs, storageTypes);
+ List<LocatedBlock> blocks = new ArrayList<>();
+ blocks.add(block);
+
+ final String targetIp = locs[4].getIpAddr();
+
+ // sort block locations
+ dm.sortLocatedBlocks(targetIp, blocks);
+
+ // check that storage IDs/types are aligned with datanode locs
+ DatanodeInfoWithStorage[] sortedLocs = block.getLocations();
+ storageIDs = block.getStorageIDs();
+ storageTypes = block.getStorageTypes();
+ assertThat(sortedLocs.length, is(5));
+ assertThat(storageIDs.length, is(5));
+ assertThat(storageTypes.length, is(5));
+ for(int i = 0; i < sortedLocs.length; i++) {
+ assertThat(sortedLocs[i].getStorageID(), is(storageIDs[i]));
+ assertThat(sortedLocs[i].getStorageType(), is(storageTypes[i]));
+ }
+
+ // Ensure the local node is first.
+ assertThat(sortedLocs[0].getIpAddr(), is(targetIp));
+
+ // Ensure the two decommissioned DNs were moved to the end.
+ assertThat(sortedLocs[sortedLocs.length-1].getAdminState(),
+ is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+ assertThat(sortedLocs[sortedLocs.length-2].getAdminState(),
+ is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+ }
}