You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/08/27 07:15:50 UTC

[23/50] [abbrv] hadoop git commit: HDFS-11032: [SPS]: Handling of block movement failure at the coordinator datanode. Contributed by Rakesh R

HDFS-11032: [SPS]: Handling of block movement failure at the coordinator datanode. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f18deaee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f18deaee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f18deaee

Branch: refs/heads/HDFS-10285
Commit: f18deaeeb69badb972f1155004e83e2de60f8ef5
Parents: 98ced53
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Thu Dec 22 17:07:49 2016 -0800
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Sun Aug 27 11:54:31 2017 +0530

----------------------------------------------------------------------
 .../datanode/StoragePolicySatisfyWorker.java    |   9 +-
 .../namenode/TestStoragePolicySatisfier.java    | 168 +++++++++++++++----
 2 files changed, 143 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f18deaee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index a69a38b..19f3fe2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -251,6 +252,12 @@ public class StoragePolicySatisfyWorker {
                 + " satisfying storageType:{}",
             block, source, target, targetStorageType);
         return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
+      } catch (BlockPinningException e) {
+        // Pinned block won't be able to move to a different node. So, its not
+        // required to do retries, just marked as SUCCESS.
+        LOG.debug("Pinned block can't be moved, so skipping block:{}", block,
+            e);
+        return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
       } catch (IOException e) {
         // TODO: handle failure retries
         LOG.warn(
@@ -282,7 +289,7 @@ public class StoragePolicySatisfyWorker {
         response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
       }
       String logInfo = "reportedBlock move is failed";
-      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
+      DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f18deaee/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index fe23f3e..179b66b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
@@ -36,12 +37,15 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,10 +70,16 @@ public class TestStoragePolicySatisfier {
   final private long capacity = 2 * 256 * 1024 * 1024;
   final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
   private DistributedFileSystem dfs = null;
+  private static final int DEFAULT_BLOCK_SIZE = 1024;
 
-  @Before
-  public void setUp() throws IOException {
-    config.setLong("dfs.block.size", 1024);
+  private void shutdownCluster() {
+    if (hdfsCluster != null) {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  private void createCluster() throws IOException {
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
     hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
         storagesPerDatanode, capacity);
     dfs = hdfsCluster.getFileSystem();
@@ -81,6 +91,7 @@ public class TestStoragePolicySatisfier {
       throws Exception {
 
     try {
+      createCluster();
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), "COLD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -99,7 +110,7 @@ public class TestStoragePolicySatisfier {
       // Wait till namenode notified about the block location details
       waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -107,6 +118,7 @@ public class TestStoragePolicySatisfier {
   public void testWhenStoragePolicySetToALLSSD()
       throws Exception {
     try {
+      createCluster();
       // Change policy to ALL_SSD
       dfs.setStoragePolicy(new Path(file), "ALL_SSD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -127,7 +139,7 @@ public class TestStoragePolicySatisfier {
       // areas
       waitExpectedStorageType(file, StorageType.SSD, 3, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -135,6 +147,7 @@ public class TestStoragePolicySatisfier {
   public void testWhenStoragePolicySetToONESSD()
       throws Exception {
     try {
+      createCluster();
       // Change policy to ONE_SSD
       dfs.setStoragePolicy(new Path(file), "ONE_SSD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -154,7 +167,7 @@ public class TestStoragePolicySatisfier {
       waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
       waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -165,6 +178,7 @@ public class TestStoragePolicySatisfier {
   @Test(timeout = 300000)
   public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
     try {
+      createCluster();
       // Change policy to ONE_SSD
       dfs.setStoragePolicy(new Path(file), "ONE_SSD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -186,7 +200,7 @@ public class TestStoragePolicySatisfier {
 
       waitForBlocksMovementResult(1, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -196,18 +210,18 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testMultipleFilesForSatisfyStoragePolicy() throws Exception {
-    List<String> files = new ArrayList<>();
-    files.add(file);
-
-    // Creates 4 more files. Send all of them for satisfying the storage policy
-    // together.
-    for (int i = 0; i < 4; i++) {
-      String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
-      files.add(file1);
-      writeContent(file1);
-    }
-
     try {
+      createCluster();
+      List<String> files = new ArrayList<>();
+      files.add(file);
+
+      // Creates 4 more files. Send all of them for satisfying the storage
+      // policy together.
+      for (int i = 0; i < 4; i++) {
+        String file1 = "/testMoveWhenStoragePolicyNotSatisfying_" + i;
+        files.add(file1);
+        writeContent(file1);
+      }
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
       List<Long> blockCollectionIds = new ArrayList<>();
       // Change policy to ONE_SSD
@@ -237,7 +251,7 @@ public class TestStoragePolicySatisfier {
 
       waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -247,10 +261,10 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSatisfyFileWithHdfsAdmin() throws Exception {
-    HdfsAdmin hdfsAdmin =
-        new HdfsAdmin(FileSystem.getDefaultUri(config), config);
     try {
-
+      createCluster();
+      HdfsAdmin hdfsAdmin =
+          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), "COLD");
 
@@ -267,7 +281,7 @@ public class TestStoragePolicySatisfier {
       // Wait till namenode notified about the block location details
       waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -277,11 +291,10 @@ public class TestStoragePolicySatisfier {
    */
   @Test(timeout = 300000)
   public void testSatisfyDirWithHdfsAdmin() throws Exception {
-    HdfsAdmin hdfsAdmin =
-        new HdfsAdmin(FileSystem.getDefaultUri(config), config);
-
     try {
-
+      createCluster();
+      HdfsAdmin hdfsAdmin =
+          new HdfsAdmin(FileSystem.getDefaultUri(config), config);
       final String subDir = "/subDir";
       final String subFile1 = subDir + "/subFile1";
       final String subDir2 = subDir + "/subDir2";
@@ -310,7 +323,7 @@ public class TestStoragePolicySatisfier {
       // take no effect for the sub-dir's file in the directory.
       waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -321,6 +334,7 @@ public class TestStoragePolicySatisfier {
   @Test(timeout = 300000)
   public void testSatisfyWithExceptions() throws Exception {
     try {
+      createCluster();
       final String nonExistingFile = "/noneExistingFile";
       hdfsCluster.getConfiguration(0).
           setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, false);
@@ -354,7 +368,7 @@ public class TestStoragePolicySatisfier {
 
       }
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -376,6 +390,7 @@ public class TestStoragePolicySatisfier {
   public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
       throws Exception {
     try {
+      createCluster();
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), "COLD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -397,7 +412,7 @@ public class TestStoragePolicySatisfier {
 
       waitForBlocksMovementResult(1, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -418,6 +433,7 @@ public class TestStoragePolicySatisfier {
   public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
       throws Exception {
     try {
+      createCluster();
       // Change policy to COLD
       dfs.setStoragePolicy(new Path(file), "COLD");
       FSNamesystem namesystem = hdfsCluster.getNamesystem();
@@ -440,7 +456,7 @@ public class TestStoragePolicySatisfier {
       // re-attempted.
       waitForAttemptedItems(1, 30000);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -452,6 +468,7 @@ public class TestStoragePolicySatisfier {
   public void testWhenMoverIsAlreadyRunningBeforeStoragePolicySatisfier()
       throws IOException {
     try {
+      createCluster();
       // Simulate Mover by creating MOVER_ID file
       DFSTestUtil.createFile(hdfsCluster.getFileSystem(),
           HdfsServerConstants.MOVER_ID_PATH, 0, (short) 1, 0);
@@ -461,8 +478,93 @@ public class TestStoragePolicySatisfier {
       Assert.assertFalse("SPS should not start "
           + "when a Mover instance is running", running);
     } finally {
-      hdfsCluster.shutdown();
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test to verify that satisfy worker can't move blocks. If the given block is
+   * pinned it shouldn't be considered for retries.
+   */
+  @Test(timeout = 120000)
+  public void testMoveWithBlockPinning() throws Exception {
+    config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+    hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.DISK},
+                {StorageType.DISK, StorageType.DISK},
+                {StorageType.DISK, StorageType.DISK}})
+        .build();
+
+    hdfsCluster.waitActive();
+    dfs = hdfsCluster.getFileSystem();
+
+    // create a file with replication factor 3 and mark 2 pinned block
+    // locations.
+    final String file1 = createFileAndSimulateFavoredNodes(2);
+
+    // Change policy to COLD
+    dfs.setStoragePolicy(new Path(file1), "COLD");
+    FSNamesystem namesystem = hdfsCluster.getNamesystem();
+    INode inode = namesystem.getFSDirectory().getINode(file1);
+
+    StorageType[][] newtypes =
+        new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE},
+            {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+    // Adding DISK based datanodes
+    startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+        storagesPerDatanode, capacity, hdfsCluster);
+
+    namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+    hdfsCluster.triggerHeartbeats();
+
+    // No block movement will be scheduled as there is no target node available
+    // with the required storage type.
+    waitForAttemptedItems(1, 30000);
+    waitForBlocksMovementResult(1, 30000);
+    waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000);
+    waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
+  }
+
+  private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
+      throws IOException {
+    ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
+    final String file1 = "/testMoveWithBlockPinning";
+    // replication factor 3
+    InetSocketAddress[] favoredNodes = new InetSocketAddress[favoredNodesCount];
+    for (int i = 0; i < favoredNodesCount; i++) {
+      favoredNodes[i] = dns.get(i).getXferAddress();
+    }
+    DFSTestUtil.createFile(dfs, new Path(file1), false, 1024, 100,
+        DEFAULT_BLOCK_SIZE, (short) 3, 0, false, favoredNodes);
+
+    LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file1, 0);
+    Assert.assertEquals("Wrong block count", 1,
+        locatedBlocks.locatedBlockCount());
+
+    // verify storage type before movement
+    LocatedBlock lb = locatedBlocks.get(0);
+    StorageType[] storageTypes = lb.getStorageTypes();
+    for (StorageType storageType : storageTypes) {
+      Assert.assertTrue(StorageType.DISK == storageType);
+    }
+
+    // Mock FsDatasetSpi#getPinning to show that the block is pinned.
+    DatanodeInfo[] locations = lb.getLocations();
+    Assert.assertEquals(3, locations.length);
+    Assert.assertTrue(favoredNodesCount < locations.length);
+    for(DatanodeInfo dnInfo: locations){
+      LOG.info("Simulate block pinning in datanode {}",
+          locations[favoredNodesCount]);
+      DataNode dn = hdfsCluster.getDataNode(dnInfo.getIpcPort());
+      DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
+      favoredNodesCount--;
+      if (favoredNodesCount <= 0) {
+        break;// marked favoredNodesCount number of pinned block location
+      }
     }
+    return file1;
   }
 
   private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org