You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/24 07:32:37 UTC

[42/50] [abbrv] hadoop git commit: HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/343d9cbd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
deleted file mode 100644
index 6991ad2..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.StripedFileTestUtil;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Supplier;
-
-/**
- * Tests that StoragePolicySatisfier daemon is able to check the striped blocks
- * to be moved and finding its expected target locations in order to satisfy the
- * storage policy.
- */
-public class TestStoragePolicySatisfierWithStripedFile {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestStoragePolicySatisfierWithStripedFile.class);
-
-  private final int stripesPerBlock = 2;
-
-  private ErasureCodingPolicy ecPolicy;
-  private int dataBlocks;
-  private int parityBlocks;
-  private int cellSize;
-  private int defaultStripeBlockSize;
-
-  private ErasureCodingPolicy getEcPolicy() {
-    return StripedFileTestUtil.getDefaultECPolicy();
-  }
-
-  /**
-   * Initialize erasure coding policy.
-   */
-  @Before
-  public void init(){
-    ecPolicy = getEcPolicy();
-    dataBlocks = ecPolicy.getNumDataUnits();
-    parityBlocks = ecPolicy.getNumParityUnits();
-    cellSize = ecPolicy.getCellSize();
-    defaultStripeBlockSize = cellSize * stripesPerBlock;
-  }
-
-  /**
-   * Tests to verify that all the striped blocks(data + parity blocks) are
-   * moving to satisfy the storage policy.
-   */
-  @Test(timeout = 300000)
-  public void testMoverWithFullStripe() throws Exception {
-    // start 10 datanodes
-    int numOfDatanodes = 10;
-    int storagesPerDatanode = 2;
-    long capacity = 20 * defaultStripeBlockSize;
-    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
-    for (int i = 0; i < numOfDatanodes; i++) {
-      for (int j = 0; j < storagesPerDatanode; j++) {
-        capacities[i][j] = capacity;
-      }
-    }
-
-    final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
-    initConfWithStripe(conf, defaultStripeBlockSize);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(numOfDatanodes)
-        .storagesPerDatanode(storagesPerDatanode)
-        .storageTypes(new StorageType[][]{
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE}})
-        .storageCapacities(capacities)
-        .build();
-
-    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      dfs.enableErasureCodingPolicy(
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-
-      // set "/bar" directory with HOT storage policy.
-      ClientProtocol client = NameNodeProxies.createProxy(conf,
-          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-      String barDir = "/bar";
-      client.mkdirs(barDir, new FsPermission((short) 777), true);
-      client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
-      // set an EC policy on "/bar" directory
-      client.setErasureCodingPolicy(barDir,
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-
-      // write file to barDir
-      final String fooFile = "/bar/foo";
-      long fileLen = cellSize * dataBlocks;
-      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
-          fileLen, (short) 3, 0);
-
-      // verify storage types and locations
-      LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
-          fileLen);
-      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-        for (StorageType type : lb.getStorageTypes()) {
-          Assert.assertEquals(StorageType.DISK, type);
-        }
-      }
-      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
-          dataBlocks + parityBlocks);
-
-      // start 5 more datanodes
-      int numOfNewDatanodes = 5;
-      capacities = new long[numOfNewDatanodes][storagesPerDatanode];
-      for (int i = 0; i < numOfNewDatanodes; i++) {
-        for (int j = 0; j < storagesPerDatanode; j++) {
-          capacities[i][j] = capacity;
-        }
-      }
-      cluster.startDataNodes(conf, 5,
-          new StorageType[][]{
-              {StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE}},
-          true, null, null, null, capacities, null, false, false, false, null);
-      cluster.triggerHeartbeats();
-
-      // move file to ARCHIVE
-      client.setStoragePolicy(barDir, "COLD");
-      hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
-      LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
-      cluster.triggerHeartbeats();
-
-      waitForBlocksMovementAttemptReport(cluster, 9, 60000);
-      // verify storage types and locations
-      waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
-          9, 60000);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * Tests to verify that only few datanodes are available and few striped
-   * blocks are able to move. Others are still trying to find available nodes.
-   *
-   * For example, we have 3 nodes A(disk, disk), B(disk, disk), C(disk, archive)
-   *
-   * Assume a block with storage locations A(disk), B(disk), C(disk). Now, set
-   * policy as COLD and invoked {@link HdfsAdmin#satisfyStoragePolicy(Path)},
-   * while choosing the target node for A, it shouldn't choose C. For C, it
-   * should do local block movement as it has ARCHIVE storage type.
-   */
-  @Test(timeout = 300000)
-  public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
-      throws Exception {
-    // start 10 datanodes
-    int numOfDatanodes = 10;
-    int storagesPerDatanode = 2;
-    long capacity = 20 * defaultStripeBlockSize;
-    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
-    for (int i = 0; i < numOfDatanodes; i++) {
-      for (int j = 0; j < storagesPerDatanode; j++) {
-        capacities[i][j] = capacity;
-      }
-    }
-
-    final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
-    initConfWithStripe(conf, defaultStripeBlockSize);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(numOfDatanodes)
-        .storagesPerDatanode(storagesPerDatanode)
-        .storageTypes(new StorageType[][]{
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE}})
-        .storageCapacities(capacities)
-        .build();
-
-    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      dfs.enableErasureCodingPolicy(
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-      // set "/bar" directory with HOT storage policy.
-      ClientProtocol client = NameNodeProxies.createProxy(conf,
-          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-      String barDir = "/bar";
-      client.mkdirs(barDir, new FsPermission((short) 777), true);
-      client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
-      // set an EC policy on "/bar" directory
-      client.setErasureCodingPolicy(barDir,
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-
-      // write file to barDir
-      final String fooFile = "/bar/foo";
-      long fileLen = cellSize * dataBlocks;
-      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
-          fileLen, (short) 3, 0);
-
-      // verify storage types and locations
-      LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
-          fileLen);
-      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-        for (StorageType type : lb.getStorageTypes()) {
-          Assert.assertEquals(StorageType.DISK, type);
-        }
-      }
-      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
-          dataBlocks + parityBlocks);
-
-      // start 2 more datanodes
-      int numOfNewDatanodes = 2;
-      capacities = new long[numOfNewDatanodes][storagesPerDatanode];
-      for (int i = 0; i < numOfNewDatanodes; i++) {
-        for (int j = 0; j < storagesPerDatanode; j++) {
-          capacities[i][j] = capacity;
-        }
-      }
-      cluster.startDataNodes(conf, 2,
-          new StorageType[][]{
-              {StorageType.ARCHIVE, StorageType.ARCHIVE},
-              {StorageType.ARCHIVE, StorageType.ARCHIVE}},
-          true, null, null, null, capacities, null, false, false, false, null);
-      cluster.triggerHeartbeats();
-
-      // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
-      // storage type.
-      client.setStoragePolicy(barDir, "COLD");
-      hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
-      LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
-      cluster.triggerHeartbeats();
-
-      waitForBlocksMovementAttemptReport(cluster, 5, 60000);
-      waitForAttemptedItems(cluster, 1, 30000);
-      // verify storage types and locations.
-      waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
-          9, 60000);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  /**
-   * Test SPS for low redundant file blocks.
-   * 1. Create cluster with 10 datanode.
-   * 1. Create one striped file with default EC Policy.
-   * 2. Set policy and call satisfyStoragePolicy for file.
-   * 3. Stop NameNode and Datanodes.
-   * 4. Start NameNode with 5 datanode and wait for block movement.
-   * 5. Start remaining 5 datanode.
-   * 6. All replica  should be moved in proper storage based on policy.
-   */
-  @Test(timeout = 300000)
-  public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
-    // start 9 datanodes
-    int numOfDatanodes = 9;
-    int storagesPerDatanode = 2;
-    long capacity = 20 * defaultStripeBlockSize;
-    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
-    for (int i = 0; i < numOfDatanodes; i++) {
-      for (int j = 0; j < storagesPerDatanode; j++) {
-        capacities[i][j] = capacity;
-      }
-    }
-
-    final Configuration conf = new HdfsConfiguration();
-    conf.set(DFSConfigKeys
-        .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
-        "3000");
-    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
-    initConfWithStripe(conf, defaultStripeBlockSize);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(numOfDatanodes)
-        .storagesPerDatanode(storagesPerDatanode)
-        .storageTypes(new StorageType[][]{
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE},
-            {StorageType.DISK, StorageType.ARCHIVE}})
-        .storageCapacities(capacities)
-        .build();
-    try {
-      cluster.waitActive();
-      DistributedFileSystem fs = cluster.getFileSystem();
-      fs.enableErasureCodingPolicy(
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-      Path barDir = new Path("/bar");
-      fs.mkdirs(barDir);
-      // set an EC policy on "/bar" directory
-      fs.setErasureCodingPolicy(barDir,
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-
-      // write file to barDir
-      final Path fooFile = new Path("/bar/foo");
-      long fileLen = cellSize * dataBlocks;
-      DFSTestUtil.createFile(cluster.getFileSystem(), fooFile,
-          fileLen, (short) 3, 0);
-
-      // Move file to ARCHIVE.
-      fs.setStoragePolicy(barDir, "COLD");
-      //Stop DataNodes and restart namenode
-      List<DataNodeProperties> list = new ArrayList<>(numOfDatanodes);
-      for (int i = 0; i < numOfDatanodes; i++) {
-        list.add(cluster.stopDataNode(0));
-      }
-      cluster.restartNameNodes();
-      // Restart half datanodes
-      for (int i = 0; i < 5; i++) {
-        cluster.restartDataNode(list.get(i), false);
-      }
-      cluster.waitActive();
-      fs.satisfyStoragePolicy(fooFile);
-      DFSTestUtil.waitExpectedStorageType(fooFile.toString(),
-          StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem());
-      //Start reaming datanodes
-      for (int i = numOfDatanodes - 1; i >= 5; i--) {
-        cluster.restartDataNode(list.get(i), false);
-      }
-      // verify storage types and locations.
-      waitExpectedStorageType(cluster, fooFile.toString(), fileLen,
-          StorageType.ARCHIVE, 9, 9, 60000);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-
-  /**
-   * Tests to verify that for the given path, no blocks under the given path
-   * will be scheduled for block movement as there are no available datanode
-   * with required storage type.
-   *
-   * For example, there are two block for a file:
-   *
-   * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
-   * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
-   * No datanode is available with storage type ARCHIVE.
-   *
-   * SPS won't schedule any block movement for this path.
-   */
-  @Test(timeout = 300000)
-  public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
-      throws Exception {
-    // start 10 datanodes
-    int numOfDatanodes = 10;
-    int storagesPerDatanode = 2;
-    long capacity = 20 * defaultStripeBlockSize;
-    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
-    for (int i = 0; i < numOfDatanodes; i++) {
-      for (int j = 0; j < storagesPerDatanode; j++) {
-        capacities[i][j] = capacity;
-      }
-    }
-
-    final Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
-        true);
-    initConfWithStripe(conf, defaultStripeBlockSize);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(numOfDatanodes)
-        .storagesPerDatanode(storagesPerDatanode)
-        .storageTypes(new StorageType[][]{
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK},
-            {StorageType.DISK, StorageType.DISK}})
-        .storageCapacities(capacities)
-        .build();
-
-    HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      dfs.enableErasureCodingPolicy(
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-      // set "/bar" directory with HOT storage policy.
-      ClientProtocol client = NameNodeProxies.createProxy(conf,
-          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-      String barDir = "/bar";
-      client.mkdirs(barDir, new FsPermission((short) 777), true);
-      client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
-      // set an EC policy on "/bar" directory
-      client.setErasureCodingPolicy(barDir,
-          StripedFileTestUtil.getDefaultECPolicy().getName());
-
-      // write file to barDir
-      final String fooFile = "/bar/foo";
-      long fileLen = cellSize * dataBlocks;
-      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
-          fileLen, (short) 3, 0);
-
-      // verify storage types and locations
-      LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
-          fileLen);
-      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-        for (StorageType type : lb.getStorageTypes()) {
-          Assert.assertEquals(StorageType.DISK, type);
-        }
-      }
-      StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
-          dataBlocks + parityBlocks);
-
-      // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
-      // storage type.
-      client.setStoragePolicy(barDir, "COLD");
-      hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
-      LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
-      cluster.triggerHeartbeats();
-
-      waitForAttemptedItems(cluster, 1, 30000);
-      // verify storage types and locations.
-      waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.DISK, 9, 9,
-          60000);
-      waitForAttemptedItems(cluster, 1, 30000);
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
-  private void waitForAttemptedItems(MiniDFSCluster cluster,
-      long expectedBlkMovAttemptedCount, int timeout)
-          throws TimeoutException, InterruptedException {
-    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
-    final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
-            expectedBlkMovAttemptedCount,
-            sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
-        return sps.getAttemptedItemsMonitor()
-            .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
-      }
-    }, 100, timeout);
-  }
-
-  private static void initConfWithStripe(Configuration conf,
-      int stripeBlockSize) {
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, stripeBlockSize);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
-        1L);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
-        false);
-  }
-
-  // Check whether the Block movement has been successfully completed to satisfy
-  // the storage policy for the given file.
-  private void waitExpectedStorageType(MiniDFSCluster cluster,
-      final String fileName, long fileLen,
-      final StorageType expectedStorageType, int expectedStorageCount,
-      int expectedBlkLocationCount, int timeout) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        int actualStorageCount = 0;
-        try {
-          LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient()
-              .getLocatedBlocks(fileName, 0, fileLen);
-          for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-            LOG.info("LocatedBlocks => Size {}, locs {}",
-                lb.getLocations().length, lb);
-            if (lb.getLocations().length > expectedBlkLocationCount) {
-              return false;
-            }
-            for (StorageType storageType : lb.getStorageTypes()) {
-              if (expectedStorageType == storageType) {
-                actualStorageCount++;
-              } else {
-                LOG.info("Expected storage type {} and actual {}",
-                    expectedStorageType, storageType);
-              }
-            }
-          }
-          LOG.info(
-              expectedStorageType + " replica count, expected={} and actual={}",
-              expectedStorageCount, actualStorageCount);
-        } catch (IOException e) {
-          LOG.error("Exception while getting located blocks", e);
-          return false;
-        }
-        return expectedStorageCount == actualStorageCount;
-      }
-    }, 100, timeout);
-  }
-
-  // Check whether the block movement attempt report has been arrived at the
-  // Namenode(SPS).
-  private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
-      long expectedMovementFinishedBlocksCount, int timeout)
-          throws TimeoutException, InterruptedException {
-    BlockManager blockManager = cluster.getNamesystem().getBlockManager();
-    final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
-    Assert.assertNotNull("Failed to get SPS object reference!", sps);
-
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
-            expectedMovementFinishedBlocksCount,
-            sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
-        return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
-            >= expectedMovementFinishedBlocksCount;
-      }
-    }, 100, timeout);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/343d9cbd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..62766d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests that block storage movement attempt failures are reported from DN and
+ * processed them correctly or not.
+ */
+public class TestBlockStorageMovementAttemptedItems {
+
+  private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
+  private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+  private final int selfRetryTimeout = 500;
+
+  @Before
+  public void setup() throws Exception {
+    unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
+        Mockito.mock(Namesystem.class),
+        Mockito.mock(StoragePolicySatisfier.class), 100);
+    bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
+        selfRetryTimeout, unsatisfiedStorageMovementFiles);
+  }
+
+  @After
+  public void teardown() {
+    if (bsmAttemptedItems != null) {
+      bsmAttemptedItems.stop();
+      bsmAttemptedItems.stopGracefully();
+    }
+  }
+
+  private boolean checkItemMovedForRetry(Long item, long retryTimeout)
+      throws InterruptedException {
+    long stopTime = monotonicNow() + (retryTimeout * 2);
+    boolean isItemFound = false;
+    while (monotonicNow() < (stopTime)) {
+      ItemInfo ele = null;
+      while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
+        if (item == ele.getTrackId()) {
+          isItemFound = true;
+          break;
+        }
+      }
+      if (!isItemFound) {
+        Thread.sleep(100);
+      } else {
+        break;
+      }
+    }
+    return isItemFound;
+  }
+
+  /**
+   * Verify that moved blocks reporting should queued up the block info.
+   */
+  @Test(timeout = 30000)
+  public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
+    bsmAttemptedItems.start(); // start block movement result monitor thread
+    Long item = new Long(1234);
+    List<Block> blocks = new ArrayList<Block>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
+    Block[] blockArray = new Block[blocks.size()];
+    blocks.toArray(blockArray);
+    bsmAttemptedItems.addReportedMovedBlocks(blockArray);
+    assertEquals("Failed to receive result!", 1,
+        bsmAttemptedItems.getMovementFinishedBlocksCount());
+  }
+
+  /**
+   * Verify empty moved blocks reporting queue.
+   */
+  @Test(timeout = 30000)
+  public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
+    bsmAttemptedItems.start(); // start block movement report monitor thread
+    Long item = new Long(1234);
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
+    assertEquals("Shouldn't receive result", 0,
+        bsmAttemptedItems.getMovementFinishedBlocksCount());
+    assertEquals("Item doesn't exist in the attempted list", 1,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+
+  /**
+   * Partial block movement with
+   * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence
+   * is #blockStorageMovementReportedItemsCheck() and then
+   * #blocksStorageMovementUnReportedItemsCheck().
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementShouldBeRetried1() throws Exception {
+    Long item = new Long(1234);
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    blocks.add(new Block(5678L));
+    Long trackID = 0L;
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+
+    // start block movement report monitor thread
+    bsmAttemptedItems.start();
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(trackID, 5000));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+
+  /**
+   * Partial block movement. Here, first occurrence is
+   * #blocksStorageMovementUnReportedItemsCheck() and then
+   * #blockStorageMovementReportedItemsCheck().
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementShouldBeRetried2() throws Exception {
+    Long item = new Long(1234);
+    Long trackID = 0L;
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+
+    Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
+
+    bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
+    bsmAttemptedItems.blockStorageMovementReportedItemsCheck();
+
+    assertTrue("Failed to add to the retry list",
+        checkItemMovedForRetry(trackID, 5000));
+    assertEquals("Failed to remove from the attempted list", 0,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+
+  /**
+   * Partial block movement with only BlocksStorageMoveAttemptFinished report
+   * and storageMovementAttemptedItems list is empty.
+   */
+  @Test(timeout = 30000)
+  public void testPartialBlockMovementWithEmptyAttemptedQueue()
+      throws Exception {
+    Long item = new Long(1234);
+    Long trackID = 0L;
+    List<Block> blocks = new ArrayList<>();
+    blocks.add(new Block(item));
+    bsmAttemptedItems
+        .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+    Block[] blksMovementReport = new Block[1];
+    blksMovementReport[0] = new Block(item);
+    bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+    assertFalse(
+        "Should not add in queue again if it is not there in"
+            + " storageMovementAttemptedItems",
+        checkItemMovedForRetry(trackID, 5000));
+    assertEquals("Failed to remove from the attempted list", 1,
+        bsmAttemptedItems.getAttemptedItemsCount());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org