You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/08/27 07:15:50 UTC
[23/50] [abbrv] hadoop git commit: HDFS-11032: [SPS]: Handling of
block movement failure at the coordinator datanode. Contributed by Rakesh R
HDFS-11032: [SPS]: Handling of block movement failure at the coordinator datanode. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f18deaee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f18deaee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f18deaee
Branch: refs/heads/HDFS-10285
Commit: f18deaeeb69badb972f1155004e83e2de60f8ef5
Parents: 98ced53
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Thu Dec 22 17:07:49 2016 -0800
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Sun Aug 27 11:54:31 2017 +0530
----------------------------------------------------------------------
.../datanode/StoragePolicySatisfyWorker.java | 9 +-
.../namenode/TestStoragePolicySatisfier.java | 168 +++++++++++++++----
2 files changed, 143 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f18deaee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index a69a38b..19f3fe2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -251,6 +252,12 @@ public class StoragePolicySatisfyWorker {
+ " satisfying storageType:{}",
block, source, target, targetStorageType);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+ } catch (BlockPinningException e) {
+ // Pinned block won't be able to move to a different node. So, its not
+ // required to do retries, just marked as SUCCESS.
+ LOG.debug("Pinned block can't be moved, so skipping block:{}", block,
+ e);
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
@@ -282,7 +289,7 @@ public class StoragePolicySatisfyWorker {
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
String logInfo = "reportedBlock move is failed";
- DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+ DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f18deaee/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index fe23f3e..179b66b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
@@ -36,12 +37,15 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,10 +70,16 @@ public class TestStoragePolicySatisfier {
final private long capacity = 2 * 256 * 1024 * 1024;
final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
private DistributedFileSystem dfs = null;
+ private static final int DEFAULT_BLOCK_SIZE = 1024;
- @Before
- public void setUp() throws IOException {
- config.setLong("dfs.block.size", 1024);
+ private void shutdownCluster() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ private void createCluster() throws IOException {
+ config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
storagesPerDatanode, capacity);
dfs = hdfsCluster.getFileSystem();
@@ -81,6 +91,7 @@ public class TestStoragePolicySatisfier {
throws Exception {
try {
+ createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -99,7 +110,7 @@ public class TestStoragePolicySatisfier {
// Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -107,6 +118,7 @@ public class TestStoragePolicySatisfier {
public void testWhenStoragePolicySetToALLSSD()
throws Exception {
try {
+ createCluster();
// Change policy to ALL_SSD
dfs.setStoragePolicy(new Path(file), "ALL_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -127,7 +139,7 @@ public class TestStoragePolicySatisfier {
// areas
waitExpectedStorageType(file, StorageType.SSD, 3, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -135,6 +147,7 @@ public class TestStoragePolicySatisfier {
public void testWhenStoragePolicySetToONESSD()
throws Exception {
try {
+ createCluster();
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -154,7 +167,7 @@ public class TestStoragePolicySatisfier {
waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -165,6 +178,7 @@ public class TestStoragePolicySatisfier {
@Test(timeout = 300000)
public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
try {
+ createCluster();
// Change policy to ONE_SSD
dfs.setStoragePolicy(new Path(file), "ONE_SSD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -186,7 +200,7 @@ public class TestStoragePolicySatisfier {
waitForBlocksMovementResult(1, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -196,18 +210,18 @@ public class TestStoragePolicySatisfier {
*/
@Test(timeout = 300000)
public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
- List<String> files = new ArrayList<>();
- files.add(file);
-
- // Creates 4 more files. Send all of them for satisfying the storage policy
- // together.
- for (int i = 0; i < 4; i++) {
- String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
- files.add(file1);
- writeContent(file1);
- }
-
try {
+ createCluster();
+ List<String> files = new ArrayList<>();
+ files.add(file);
+
+ // Creates 4 more files. Send all of them for satisfying the storage
+ // policy together.
+ for (int i = 0; i < 4; i++) {
+ String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
+ files.add(file1);
+ writeContent(file1);
+ }
FSNamesystem namesystem = hdfsCluster.getNamesystem();
List<Long> blockCollectionIds = new ArrayList<>();
// Change policy to ONE_SSD
@@ -237,7 +251,7 @@ public class TestStoragePolicySatisfier {
waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -247,10 +261,10 @@ public class TestStoragePolicySatisfier {
*/
@Test(timeout = 300000)
public void testSatisfyFileWithHdfsAdmin() throws Exception {
- HdfsAdmin hdfsAdmin =
- new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
-
+ createCluster();
+ HdfsAdmin hdfsAdmin =
+ new HdfsAdmin(FileSystem.getDefaultUri(config), config);
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
@@ -267,7 +281,7 @@ public class TestStoragePolicySatisfier {
// Wait till namenode notified about the block location details
waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -277,11 +291,10 @@ public class TestStoragePolicySatisfier {
*/
@Test(timeout = 300000)
public void testSatisfyDirWithHdfsAdmin() throws Exception {
- HdfsAdmin hdfsAdmin =
- new HdfsAdmin(FileSystem.getDefaultUri(config), config);
-
try {
-
+ createCluster();
+ HdfsAdmin hdfsAdmin =
+ new HdfsAdmin(FileSystem.getDefaultUri(config), config);
final String subDir = "/subDir";
final String subFile1 = subDir + "/subFile1";
final String subDir2 = subDir + "/subDir2";
@@ -310,7 +323,7 @@ public class TestStoragePolicySatisfier {
// take no effect for the sub-dir's file in the directory.
waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -321,6 +334,7 @@ public class TestStoragePolicySatisfier {
@Test(timeout = 300000)
public void testSatisfyWithExceptions() throws Exception {
try {
+ createCluster();
final String nonExistingFile = "/noneExistingFile";
hdfsCluster.getConfiguration(0).
setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
@@ -354,7 +368,7 @@ public class TestStoragePolicySatisfier {
}
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -376,6 +390,7 @@ public class TestStoragePolicySatisfier {
public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
throws Exception {
try {
+ createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -397,7 +412,7 @@ public class TestStoragePolicySatisfier {
waitForBlocksMovementResult(1, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -418,6 +433,7 @@ public class TestStoragePolicySatisfier {
public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
throws Exception {
try {
+ createCluster();
// Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -440,7 +456,7 @@ public class TestStoragePolicySatisfier {
// re-attempted.
waitForAttemptedItems(1, 30000);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -452,6 +468,7 @@ public class TestStoragePolicySatisfier {
public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
throws IOException {
try {
+ createCluster();
// Simulate Mover by creating MOVER_ID file
DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
@@ -461,8 +478,93 @@ public class TestStoragePolicySatisfier {
Assert.assertFalse("SPS should not start "
+ "when a Mover instance is running", running);
} finally {
- hdfsCluster.shutdown();
+ shutdownCluster();
+ }
+ }
+
+ /**
+ * Test to verify that satisfy worker can't move blocks. If the given block is
+ * pinned it shouldn't be considered for retries.
+ */
+ @Test(timeout = 120000)
+ public void testMoveWithBlockPinning() throws Exception {
+ config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+ hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
+ .storageTypes(
+ new StorageType[][] {{StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK}})
+ .build();
+
+ hdfsCluster.waitActive();
+ dfs = hdfsCluster.getFileSystem();
+
+ // create a file with replication factor 3 and mark 2 pinned block
+ // locations.
+ final String file1 = createFileAndSimulateFavoredNodes(2);
+
+ // Change policy to COLD
+ dfs.setStoragePolicy(new Path(file1), "COLD");
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file1);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+ // Adding DISK based datanodes
+ startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+
+ // No block movement will be scheduled as there is no target node available
+ // with the required storage type.
+ waitForAttemptedItems(1, 30000);
+ waitForBlocksMovementResult(1, 30000);
+ waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000);
+ waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
+ }
+
+ private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
+ throws IOException {
+ ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
+ final String file1 = "/testMoveWithBlockPinning";
+ // replication factor 3
+ InetSocketAddress[] favoredNodes = new InetSocketAddress[favoredNodesCount];
+ for (int i = 0; i < favoredNodesCount; i++) {
+ favoredNodes[i] = dns.get(i).getXferAddress();
+ }
+ DFSTestUtil.createFile(dfs, new Path(file1), false, 1024, 100,
+ DEFAULT_BLOCK_SIZE, (short) 3, 0, false, favoredNodes);
+
+ LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
+ Assert.assertEquals("Wrong block count", 1,
+ locatedBlocks.locatedBlockCount());
+
+ // verify storage type before movement
+ LocatedBlock lb = locatedBlocks.get(0);
+ StorageType[] storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.DISK == storageType);
+ }
+
+ // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+ DatanodeInfo[] locations = lb.getLocations();
+ Assert.assertEquals(3, locations.length);
+ Assert.assertTrue(favoredNodesCount < locations.length);
+ for(DatanodeInfo dnInfo: locations){
+ LOG.info("Simulate block pinning in datanode {}",
+ locations[favoredNodesCount]);
+ DataNode dn = hdfsCluster.getDataNode(dnInfo.getIpcPort());
+ DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+ favoredNodesCount--;
+ if (favoredNodesCount <= 0) {
+ break;// marked favoredNodesCount number of pinned block location
+ }
}
+ return file1;
}
private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org