You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/08/27 07:15:52 UTC

[25/50] [abbrv] hadoop git commit: HDFS-11193 : [SPS]: Erasure coded files should be considered for satisfying storage policy. Contributed by Rakesh R

HDFS-11193 : [SPS]: Erasure coded files should be considered for satisfying storage policy. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3b1c2624
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3b1c2624
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3b1c2624

Branch: refs/heads/HDFS-10285
Commit: 3b1c262458366a8e1e3c39c722ae7cf9115337d4
Parents: a09c7d4
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Thu Jan 5 09:30:39 2017 -0800
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Sun Aug 27 11:54:41 2017 +0530

----------------------------------------------------------------------
 .../blockmanagement/BlockInfoStriped.java       |  10 +
 .../server/namenode/StoragePolicySatisfier.java |  76 ++-
 ...stStoragePolicySatisfierWithStripedFile.java | 469 +++++++++++++++++++
 3 files changed, 551 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b1c2624/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 790cd77..8bc63c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -244,6 +244,10 @@ public class BlockInfoStriped extends BlockInfo {
     return true;
   }
 
+  /**
+   * This class contains datanode storage information and block index in the
+   * block group.
+   */
   public static class StorageAndBlockIndex {
     private final DatanodeStorageInfo storage;
     private final byte blockIndex;
@@ -253,10 +257,16 @@ public class BlockInfoStriped extends BlockInfo {
       this.blockIndex = blockIndex;
     }
 
+    /**
+     * @return storage in the datanode.
+     */
     public DatanodeStorageInfo getStorage() {
       return storage;
     }
 
+    /**
+     * @return block index in the block group.
+     */
     public byte getBlockIndex() {
       return blockIndex;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b1c2624/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 26e0775..a854bd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.balancer.Matcher;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,8 +229,26 @@ public class StoragePolicySatisfier implements Runnable {
 
     for (int i = 0; i < blocks.length; i++) {
       BlockInfo blockInfo = blocks[i];
-      List<StorageType> expectedStorageTypes = existingStoragePolicy
+      List<StorageType> expectedStorageTypes;
+      if (blockInfo.isStriped()) {
+        if (ErasureCodingPolicyManager
+            .checkStoragePolicySuitableForECStripedMode(
+                existingStoragePolicyID)) {
+          expectedStorageTypes = existingStoragePolicy
+              .chooseStorageTypes((short) blockInfo.getCapacity());
+        } else {
+          // Currently we support only limited policies (HOT, COLD, ALLSSD)
+          // for EC striped mode files. SPS will ignore to move the blocks if
+          // the storage policy is not in EC Striped mode supported policies
+          LOG.warn("The storage policy " + existingStoragePolicy.getName()
+              + " is not suitable for Striped EC files. "
+              + "So, ignoring to move the blocks");
+          return false;
+        }
+      } else {
+        expectedStorageTypes = existingStoragePolicy
             .chooseStorageTypes(blockInfo.getReplication());
+      }
       foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos(
           blockMovingInfos, blockInfo, expectedStorageTypes);
     }
@@ -439,12 +460,18 @@ public class StoragePolicySatisfier implements Runnable {
     if (sourceNodes.size() <= 0) {
       return blkMovingInfos;
     }
-    buildBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
-        targetNodes, targetStorageTypes, blkMovingInfos);
+
+    if (blockInfo.isStriped()) {
+      buildStripedBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
+          targetNodes, targetStorageTypes, blkMovingInfos);
+    } else {
+      buildContinuousBlockMovingInfos(blockInfo, sourceNodes,
+          sourceStorageTypes, targetNodes, targetStorageTypes, blkMovingInfos);
+    }
     return blkMovingInfos;
   }
 
-  private void buildBlockMovingInfos(BlockInfo blockInfo,
+  private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
       List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
       List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
       List<BlockMovingInfo> blkMovingInfos) {
@@ -458,6 +485,47 @@ public class StoragePolicySatisfier implements Runnable {
     blkMovingInfos.add(blkMovingInfo);
   }
 
+  private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
+      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
+      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+      List<BlockMovingInfo> blkMovingInfos) {
+    // For a striped block, it needs to construct internal block at the given
+    // index of a block group. Here it is iterating over all the block indices
+    // and construct internal blocks which can be then considered for block
+    // movement.
+    BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
+    for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
+      if (si.getBlockIndex() >= 0) {
+        DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
+        DatanodeInfo[] srcNode = new DatanodeInfo[1];
+        StorageType[] srcStorageType = new StorageType[1];
+        DatanodeInfo[] targetNode = new DatanodeInfo[1];
+        StorageType[] targetStorageType = new StorageType[1];
+        for (int i = 0; i < sourceNodes.size(); i++) {
+          DatanodeInfo node = sourceNodes.get(i);
+          if (node.equals(dn)) {
+            srcNode[0] = node;
+            srcStorageType[0] = sourceStorageTypes.get(i);
+            targetNode[0] = targetNodes.get(i);
+            targetStorageType[0] = targetStorageTypes.get(i);
+
+            // construct internal block
+            long blockId = blockInfo.getBlockId() + si.getBlockIndex();
+            long numBytes = StripedBlockUtil.getInternalBlockLength(
+                sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
+                sBlockInfo.getDataBlockNum(), si.getBlockIndex());
+            Block blk = new Block(blockId, numBytes,
+                blockInfo.getGenerationStamp());
+            BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, srcNode,
+                targetNode, srcStorageType, targetStorageType);
+            blkMovingInfos.add(blkMovingInfo);
+            break; // found matching source-target nodes
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Choose the target storage within same datanode if possible.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b1c2624/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
new file mode 100644
index 0000000..5f8639f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests that StoragePolicySatisfier daemon is able to check the striped blocks
+ * to be moved and finding its expected target locations in order to satisfy the
+ * storage policy.
+ */
+public class TestStoragePolicySatisfierWithStripedFile {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestStoragePolicySatisfierWithStripedFile.class);
+
+  private final int stripesPerBlock = 2;
+
+  private ErasureCodingPolicy ecPolicy;
+  private int dataBlocks;
+  private int parityBlocks;
+  private int cellSize;
+  private int defaultStripeBlockSize;
+
+  private ErasureCodingPolicy getEcPolicy() {
+    return ErasureCodingPolicyManager.getSystemDefaultPolicy();
+  }
+
+  /**
+   * Initialize erasure coding policy.
+   */
+  @Before
+  public void init(){
+    ecPolicy = getEcPolicy();
+    dataBlocks = ecPolicy.getNumDataUnits();
+    parityBlocks = ecPolicy.getNumParityUnits();
+    cellSize = ecPolicy.getCellSize();
+    defaultStripeBlockSize = cellSize * stripesPerBlock;
+  }
+
+  /**
+   * Tests to verify that all the striped blocks(data + parity blocks) are
+   * moving to satisfy the storage policy.
+   */
+  @Test(timeout = 300000)
+  public void testMoverWithFullStripe() throws Exception {
+    // start 10 datanodes
+    int numOfDatanodes = 10;
+    int storagesPerDatanode = 2;
+    long capacity = 20 * defaultStripeBlockSize;
+    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDatanode; j++) {
+        capacities[i][j] = capacity;
+      }
+    }
+
+    final Configuration conf = new HdfsConfiguration();
+    initConfWithStripe(conf, defaultStripeBlockSize);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .storagesPerDatanode(storagesPerDatanode)
+        .storageTypes(new StorageType[][]{
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE}})
+        .storageCapacities(capacities)
+        .build();
+
+    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    try {
+      cluster.waitActive();
+
+      // set "/bar" directory with HOT storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      String barDir = "/bar";
+      client.mkdirs(barDir, new FsPermission((short) 777), true);
+      client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
+      // set an EC policy on "/bar" directory
+      client.setErasureCodingPolicy(barDir, null);
+
+      // write file to barDir
+      final String fooFile = "/bar/foo";
+      long fileLen = cellSize * dataBlocks;
+      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
+          fileLen, (short) 3, 0);
+
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
+          fileLen);
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        for (StorageType type : lb.getStorageTypes()) {
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+      // start 5 more datanodes
+      int numOfNewDatanodes = 5;
+      capacities = new long[numOfNewDatanodes][storagesPerDatanode];
+      for (int i = 0; i < numOfNewDatanodes; i++) {
+        for (int j = 0; j < storagesPerDatanode; j++) {
+          capacities[i][j] = capacity;
+        }
+      }
+      cluster.startDataNodes(conf, 5,
+          new StorageType[][]{
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+          true, null, null, null, capacities, null, false, false, false, null);
+      cluster.triggerHeartbeats();
+
+      // move file to ARCHIVE
+      client.setStoragePolicy(barDir, "COLD");
+      hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
+      LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
+      cluster.triggerHeartbeats();
+
+      waitForBlocksMovementResult(cluster, 1, 60000);
+      // verify storage types and locations
+      waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
+          9, 60000);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Tests to verify that only few datanodes are available and few striped
+   * blocks are able to move. Others are still trying to find available nodes.
+   *
+   * For example, we have 3 nodes A(disk, disk), B(disk, disk), C(disk, archive)
+   *
+   * Assume a block with storage locations A(disk), B(disk), C(disk). Now, set
+   * policy as COLD and invoked {@link HdfsAdmin#satisfyStoragePolicy(Path)},
+   * while choosing the target node for A, it shouldn't choose C. For C, it
+   * should do local block movement as it has ARCHIVE storage type.
+   */
+  @Test(timeout = 300000)
+  public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
+      throws Exception {
+    // start 10 datanodes
+    int numOfDatanodes = 10;
+    int storagesPerDatanode = 2;
+    long capacity = 20 * defaultStripeBlockSize;
+    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDatanode; j++) {
+        capacities[i][j] = capacity;
+      }
+    }
+
+    final Configuration conf = new HdfsConfiguration();
+    initConfWithStripe(conf, defaultStripeBlockSize);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .storagesPerDatanode(storagesPerDatanode)
+        .storageTypes(new StorageType[][]{
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE}})
+        .storageCapacities(capacities)
+        .build();
+
+    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    try {
+      cluster.waitActive();
+
+      // set "/bar" directory with HOT storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      String barDir = "/bar";
+      client.mkdirs(barDir, new FsPermission((short) 777), true);
+      client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
+      // set an EC policy on "/bar" directory
+      client.setErasureCodingPolicy(barDir, null);
+
+      // write file to barDir
+      final String fooFile = "/bar/foo";
+      long fileLen = cellSize * dataBlocks;
+      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
+          fileLen, (short) 3, 0);
+
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
+          fileLen);
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        for (StorageType type : lb.getStorageTypes()) {
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+      // start 2 more datanodes
+      int numOfNewDatanodes = 2;
+      capacities = new long[numOfNewDatanodes][storagesPerDatanode];
+      for (int i = 0; i < numOfNewDatanodes; i++) {
+        for (int j = 0; j < storagesPerDatanode; j++) {
+          capacities[i][j] = capacity;
+        }
+      }
+      cluster.startDataNodes(conf, 2,
+          new StorageType[][]{
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+          true, null, null, null, capacities, null, false, false, false, null);
+      cluster.triggerHeartbeats();
+
+      // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
+      // storage type.
+      client.setStoragePolicy(barDir, "COLD");
+      hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
+      LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
+      cluster.triggerHeartbeats();
+
+      waitForBlocksMovementResult(cluster, 1, 60000);
+      waitForAttemptedItems(cluster, 1, 30000);
+      // verify storage types and locations.
+      waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
+          9, 60000);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Tests to verify that for the given path, no blocks under the given path
+   * will be scheduled for block movement as there are no available datanode
+   * with required storage type.
+   *
+   * For example, there are two block for a file:
+   *
+   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+   * No datanode is available with storage type ARCHIVE.
+   *
+   * SPS won't schedule any block movement for this path.
+   */
+  @Test(timeout = 300000)
+  public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
+      throws Exception {
+    // start 10 datanodes
+    int numOfDatanodes = 10;
+    int storagesPerDatanode = 2;
+    long capacity = 20 * defaultStripeBlockSize;
+    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDatanode; j++) {
+        capacities[i][j] = capacity;
+      }
+    }
+
+    final Configuration conf = new HdfsConfiguration();
+    initConfWithStripe(conf, defaultStripeBlockSize);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .storagesPerDatanode(storagesPerDatanode)
+        .storageTypes(new StorageType[][]{
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK}})
+        .storageCapacities(capacities)
+        .build();
+
+    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
+    try {
+      cluster.waitActive();
+
+      // set "/bar" directory with HOT storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      String barDir = "/bar";
+      client.mkdirs(barDir, new FsPermission((short) 777), true);
+      client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
+      // set an EC policy on "/bar" directory
+      client.setErasureCodingPolicy(barDir, null);
+
+      // write file to barDir
+      final String fooFile = "/bar/foo";
+      long fileLen = cellSize * dataBlocks;
+      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
+          fileLen, (short) 3, 0);
+
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
+          fileLen);
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        for (StorageType type : lb.getStorageTypes()) {
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+      // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
+      // storage type.
+      client.setStoragePolicy(barDir, "COLD");
+      hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
+      LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
+      cluster.triggerHeartbeats();
+
+      waitForAttemptedItems(cluster, 1, 30000);
+      // verify storage types and locations.
+      waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.DISK, 9, 9,
+          60000);
+      waitForAttemptedItems(cluster, 1, 30000);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void waitForAttemptedItems(MiniDFSCluster cluster,
+      long expectedBlkMovAttemptedCount, int timeout)
+          throws TimeoutException, InterruptedException {
+    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
+            expectedBlkMovAttemptedCount,
+            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
+        return sps.getAttemptedItemsMonitor()
+            .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+      }
+    }, 100, timeout);
+  }
+
+  private static void initConfWithStripe(Configuration conf,
+      int stripeBlockSize) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, stripeBlockSize);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
+        1L);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
+        false);
+  }
+
+  // Check whether the Block movement has been successfully completed to satisfy
+  // the storage policy for the given file.
+  private void waitExpectedStorageType(MiniDFSCluster cluster,
+      final String fileName, long fileLen,
+      final StorageType expectedStorageType, int expectedStorageCount,
+      int expectedBlkLocationCount, int timeout) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int actualStorageCount = 0;
+        try {
+          LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient()
+              .getLocatedBlocks(fileName, 0, fileLen);
+          for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+            LOG.info("LocatedBlocks => Size {}, locs {}",
+                lb.getLocations().length, lb);
+            if (lb.getLocations().length > expectedBlkLocationCount) {
+              return false;
+            }
+            for (StorageType storageType : lb.getStorageTypes()) {
+              if (expectedStorageType == storageType) {
+                actualStorageCount++;
+              } else {
+                LOG.info("Expected storage type {} and actual {}",
+                    expectedStorageType, storageType);
+              }
+            }
+          }
+          LOG.info(
+              expectedStorageType + " replica count, expected={} and actual={}",
+              expectedStorageCount, actualStorageCount);
+        } catch (IOException e) {
+          LOG.error("Exception while getting located blocks", e);
+          return false;
+        }
+        return expectedStorageCount == actualStorageCount;
+      }
+    }, 100, timeout);
+  }
+
+  // Check whether the block movement result has been arrived at the
+  // Namenode(SPS).
+  private void waitForBlocksMovementResult(MiniDFSCluster cluster,
+      long expectedBlkMovResultsCount, int timeout)
+          throws TimeoutException, InterruptedException {
+    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
+    Assert.assertNotNull("Failed to get SPS object reference!", sps);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("expectedResultsCount={} actualResultsCount={}",
+            expectedBlkMovResultsCount,
+            sps.getAttemptedItemsMonitor().resultsCount());
+        return sps.getAttemptedItemsMonitor()
+            .resultsCount() == expectedBlkMovResultsCount;
+      }
+    }, 100, timeout);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org