You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:11 UTC

[43/50] [abbrv] hadoop git commit: HDFS-10884: [SPS]: Add block movement tracker to track the completion of block movement future tasks at DN. Contributed by Rakesh R

HDFS-10884: [SPS]: Add block movement tracker to track the completion of block movement future tasks at DN. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22426183
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22426183
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22426183

Branch: refs/heads/HDFS-10285
Commit: 224261837625cc14728ad73e6ca707ca91e7ccf6
Parents: a706199
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Oct 25 00:40:45 2016 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:19 2016 +0530

----------------------------------------------------------------------
 .../datanode/BlockStorageMovementTracker.java   | 146 ++++++++++++++
 .../datanode/StoragePolicySatisfyWorker.java    | 178 +++++++++++++----
 .../protocol/BlockStorageMovementCommand.java   |  12 +-
 .../TestStoragePolicySatisfyWorker.java         | 190 +++++++++++++------
 4 files changed, 427 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
new file mode 100644
index 0000000..d31f075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to track the completion of block movement future tasks.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class BlockStorageMovementTracker implements Runnable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BlockStorageMovementTracker.class);
+  private final CompletionService<BlockMovementResult> moverCompletionService;
+  private final BlocksMovementsCompletionHandler blksMovementscompletionHandler;
+
+  // Keeps the information - trackID vs its list of blocks
+  private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
+  private final Map<Long, List<BlockMovementResult>> movementResults;
+
+  /**
+   * BlockStorageMovementTracker constructor.
+   *
+   * @param moverCompletionService
+   *          completion service.
+   * @param handler
+   *          blocks movements completion handler
+   */
+  public BlockStorageMovementTracker(
+      CompletionService<BlockMovementResult> moverCompletionService,
+      BlocksMovementsCompletionHandler handler) {
+    this.moverCompletionService = moverCompletionService;
+    this.moverTaskFutures = new HashMap<>();
+    this.blksMovementscompletionHandler = handler;
+    this.movementResults = new HashMap<>();
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      if (moverTaskFutures.size() <= 0) {
+        try {
+          synchronized (moverTaskFutures) {
+            // Waiting for mover tasks.
+            moverTaskFutures.wait(2000);
+          }
+        } catch (InterruptedException ignore) {
+          // ignore
+        }
+      }
+      try {
+        Future<BlockMovementResult> future = moverCompletionService.take();
+        if (future != null) {
+          BlockMovementResult result = future.get();
+          LOG.debug("Completed block movement. {}", result);
+          long trackId = result.getTrackId();
+          List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
+              .get(trackId);
+          blocksMoving.remove(future);
+
+          List<BlockMovementResult> resultPerTrackIdList =
+              addMovementResultToTrackIdList(result);
+
+          // Completed all the scheduled blocks movement under this 'trackId'.
+          if (blocksMoving.isEmpty()) {
+            synchronized (moverTaskFutures) {
+              moverTaskFutures.remove(trackId);
+            }
+            // handle completed blocks movements per trackId.
+            blksMovementscompletionHandler.handle(resultPerTrackIdList);
+            movementResults.remove(trackId);
+          }
+        }
+      } catch (ExecutionException | InterruptedException e) {
+        // TODO: Do we need failure retries and implement the same if required.
+        LOG.error("Exception while moving block replica to target storage type",
+            e);
+      }
+    }
+  }
+
+  private List<BlockMovementResult> addMovementResultToTrackIdList(
+      BlockMovementResult result) {
+    long trackId = result.getTrackId();
+    List<BlockMovementResult> perTrackIdList = movementResults.get(trackId);
+    if (perTrackIdList == null) {
+      perTrackIdList = new ArrayList<>();
+      movementResults.put(trackId, perTrackIdList);
+    }
+    perTrackIdList.add(result);
+    return perTrackIdList;
+  }
+
+  /**
+   * Add future task to the tracking list to check the completion status of the
+   * block movement.
+   *
+   * @param trackID
+   *          tracking Id
+   * @param futureTask
+   *          future task used for moving the respective block
+   */
+  void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
+    synchronized (moverTaskFutures) {
+      List<Future<BlockMovementResult>> futures = moverTaskFutures
+          .get(Long.valueOf(trackID));
+      // null for the first task
+      if (futures == null) {
+        futures = new ArrayList<>();
+        moverTaskFutures.put(trackID, futures);
+      }
+      futures.add(futureTask);
+      // Notify waiting tracker thread about the newly added tasks.
+      moverTaskFutures.notify();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 2c99963..604fb4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -33,7 +33,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -65,6 +64,8 @@ import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * StoragePolicySatisfyWorker handles the storage policy satisfier commands.
  * These commands would be issued from NameNode as part of Datanode's heart beat
@@ -82,8 +83,10 @@ public class StoragePolicySatisfyWorker {
 
   private final int moverThreads;
   private final ExecutorService moveExecutor;
-  private final CompletionService<Void> moverExecutorCompletionService;
-  private final List<Future<Void>> moverTaskFutures;
+  private final CompletionService<BlockMovementResult> moverCompletionService;
+  private final BlocksMovementsCompletionHandler handler;
+  private final BlockStorageMovementTracker movementTracker;
+  private Daemon movementTrackerThread;
 
   public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
     this.datanode = datanode;
@@ -92,9 +95,13 @@ public class StoragePolicySatisfyWorker {
     moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
         DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
     moveExecutor = initializeBlockMoverThreadPool(moverThreads);
-    moverExecutorCompletionService = new ExecutorCompletionService<>(
-        moveExecutor);
-    moverTaskFutures = new ArrayList<>();
+    moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
+    handler = new BlocksMovementsCompletionHandler();
+    movementTracker = new BlockStorageMovementTracker(moverCompletionService,
+        handler);
+    movementTrackerThread = new Daemon(movementTracker);
+    movementTrackerThread.setName("BlockStorageMovementTracker");
+    movementTrackerThread.start();
     // TODO: Needs to manage the number of concurrent moves per DataNode.
   }
 
@@ -133,10 +140,6 @@ public class StoragePolicySatisfyWorker {
    * separate thread. Each task will move the block replica to the target node &
    * wait for the completion.
    *
-   * TODO: Presently this function is a blocking call, this has to be refined by
-   * moving the tracking logic to another tracker thread. HDFS-10884 jira
-   * addresses the same.
-   *
    * @param trackID
    *          unique tracking identifier
    * @param blockPoolID
@@ -146,68 +149,64 @@ public class StoragePolicySatisfyWorker {
    */
   public void processBlockMovingTasks(long trackID, String blockPoolID,
       Collection<BlockMovingInfo> blockMovingInfos) {
-    Future<Void> moveCallable = null;
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
       assert blkMovingInfo
           .getSources().length == blkMovingInfo.getTargets().length;
 
       for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
         BlockMovingTask blockMovingTask = new BlockMovingTask(
-            blkMovingInfo.getBlock(), blockPoolID,
+            trackID, blockPoolID, blkMovingInfo.getBlock(),
             blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
+            blkMovingInfo.getSourceStorageTypes()[i],
             blkMovingInfo.getTargetStorageTypes()[i]);
-        moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
-        moverTaskFutures.add(moveCallable);
-      }
-    }
-
-    for (int i = 0; i < moverTaskFutures.size(); i++) {
-      try {
-        moveCallable = moverExecutorCompletionService.take();
-        moveCallable.get();
-      } catch (InterruptedException | ExecutionException e) {
-        // TODO: Failure retries and report back the error to NameNode.
-        LOG.error("Exception while moving block replica to target storage type",
-            e);
+        Future<BlockMovementResult> moveCallable = moverCompletionService
+            .submit(blockMovingTask);
+        movementTracker.addBlock(trackID, moveCallable);
       }
     }
   }
 
   /**
    * This class encapsulates the process of moving the block replica to the
-   * given target.
+   * given target and wait for the response.
    */
-  private class BlockMovingTask implements Callable<Void> {
+  private class BlockMovingTask implements Callable<BlockMovementResult> {
+    private final long trackID;
+    private final String blockPoolID;
     private final Block block;
     private final DatanodeInfo source;
     private final DatanodeInfo target;
+    private final StorageType srcStorageType;
     private final StorageType targetStorageType;
-    private String blockPoolID;
 
-    BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
-        DatanodeInfo target, StorageType targetStorageType) {
-      this.block = block;
+    BlockMovingTask(long trackID, String blockPoolID, Block block,
+        DatanodeInfo source, DatanodeInfo target,
+        StorageType srcStorageType, StorageType targetStorageType) {
+      this.trackID = trackID;
       this.blockPoolID = blockPoolID;
+      this.block = block;
       this.source = source;
       this.target = target;
+      this.srcStorageType = srcStorageType;
       this.targetStorageType = targetStorageType;
     }
 
     @Override
-    public Void call() {
-      moveBlock();
-      return null;
+    public BlockMovementResult call() {
+      BlockMovementStatus status = moveBlock();
+      return new BlockMovementResult(trackID, block.getBlockId(), target,
+          status);
     }
 
-    private void moveBlock() {
-      LOG.info("Start moving block {}", block);
-
-      LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy "
-          + "storageType:{}", block, source, target, targetStorageType);
+    private BlockMovementStatus moveBlock() {
+      LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+              + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+          block, source, target, srcStorageType, targetStorageType);
       Socket sock = null;
       DataOutputStream out = null;
       DataInputStream in = null;
       try {
+        ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
         DNConf dnConf = datanode.getDnConf();
         String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
         sock = datanode.newSocket();
@@ -218,7 +217,6 @@ public class StoragePolicySatisfyWorker {
 
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
-        ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
         Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
             extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
 
@@ -239,12 +237,14 @@ public class StoragePolicySatisfyWorker {
             "Successfully moved block:{} from src:{} to destin:{} for"
                 + " satisfying storageType:{}",
             block, source, target, targetStorageType);
+        return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
       } catch (IOException e) {
         // TODO: handle failure retries
         LOG.warn(
             "Failed to move block:{} from src:{} to destin:{} to satisfy "
                 + "storageType:{}",
             block, source, target, targetStorageType, e);
+        return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -272,4 +272,102 @@ public class StoragePolicySatisfyWorker {
       DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
     }
   }
+
+  /**
+   * Block movement status code.
+   */
+  enum BlockMovementStatus {
+    /** Success. */
+    DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
+    /**
+     * Failure due to generation time stamp mismatches or network errors
+     * or no available space.
+     */
+    DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
+
+    // TODO: need to support different type of failures. Failure due to network
+    // errors, block pinned, no space available etc.
+
+    private final int code;
+
+    private BlockMovementStatus(int code) {
+      this.code = code;
+    }
+
+    /**
+     * @return the status code.
+     */
+    int getStatusCode() {
+      return code;
+    }
+  }
+
+  /**
+   * This class represents result from a block movement task. This will have the
+   * information of the task which was successful or failed due to errors.
+   */
+  static class BlockMovementResult {
+    private final long trackId;
+    private final long blockId;
+    private final DatanodeInfo target;
+    private final BlockMovementStatus status;
+
+    public BlockMovementResult(long trackId, long blockId,
+        DatanodeInfo target, BlockMovementStatus status) {
+      this.trackId = trackId;
+      this.blockId = blockId;
+      this.target = target;
+      this.status = status;
+    }
+
+    long getTrackId() {
+      return trackId;
+    }
+
+    long getBlockId() {
+      return blockId;
+    }
+
+    BlockMovementStatus getStatus() {
+      return status;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("Block movement result(\n  ")
+          .append("track id: ").append(trackId).append(" block id: ")
+          .append(blockId).append(" target node: ").append(target)
+          .append(" movement status: ").append(status).append(")").toString();
+    }
+  }
+
+  /**
+   * Blocks movements completion handler, which is used to collect details of
+   * the completed list of block movements and notify the namenode about the
+   * success or failures.
+   */
+  static class BlocksMovementsCompletionHandler {
+    private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
+
+    /**
+     * Collect all the block movement results and notify namenode.
+     *
+     * @param results
+     *          result of all the block movements per trackId
+     */
+    void handle(List<BlockMovementResult> results) {
+      completedBlocks.addAll(results);
+      // TODO: notify namenode about the success/failures.
+    }
+
+    @VisibleForTesting
+    List<BlockMovementResult> getCompletedBlocks() {
+      return completedBlocks;
+    }
+  }
+
+  @VisibleForTesting
+  BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
+    return handler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index 7c97f1a..5dcf4e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -35,10 +35,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  * service. After the block movement this DataNode sends response back to the
  * NameNode about the movement status.
  *
- * The coordinator datanode will use 'trackId' identifier to coordinate the block
- * movement of the given set of blocks. TrackId is a unique identifier that
- * represents a group of blocks. Namenode will generate this unique value and
- * send it to the coordinator datanode along with the
+ * The coordinator datanode will use 'trackId' identifier to coordinate the
+ * block movement of the given set of blocks. TrackId is a unique identifier
+ * that represents a group of blocks. Namenode will generate this unique value
+ * and send it to the coordinator datanode along with the
  * BlockStorageMovementCommand. Datanode will monitor the completion of the
  * block movements that grouped under this trackId and notifies Namenode about
  * the completion status.
@@ -153,11 +153,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
       return new StringBuilder().append("BlockMovingInfo(\n  ")
           .append("Moving block: ").append(blk).append(" From: ")
           .append(Arrays.asList(sourceNodes)).append(" To: [")
-          .append(Arrays.asList(targetNodes)).append(")\n")
+          .append(Arrays.asList(targetNodes)).append("\n  ")
           .append(" sourceStorageTypes: ")
           .append(Arrays.toString(sourceStorageTypes))
           .append(" targetStorageTypes: ")
-          .append(Arrays.toString(targetStorageTypes)).toString();
+          .append(Arrays.toString(targetStorageTypes)).append(")").toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index d803f1a..ea3eec3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -33,10 +34,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,8 +57,9 @@ public class TestStoragePolicySatisfyWorker {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(TestStoragePolicySatisfyWorker.class);
-
   private static final int DEFAULT_BLOCK_SIZE = 100;
+  private MiniDFSCluster cluster = null;
+  private final Configuration conf = new HdfsConfiguration();
 
   private static void initConf(Configuration conf) {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
@@ -63,64 +70,141 @@ public class TestStoragePolicySatisfyWorker {
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
   }
 
+  @Before
+  public void setUp() throws IOException {
+    initConf(conf);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Tests to verify that the block replica is moving to ARCHIVE storage type to
    * fulfill the storage policy requirement.
    */
   @Test(timeout = 120000)
   public void testMoveSingleBlockToAnotherDatanode() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    initConf(conf);
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(4)
-            .storageTypes(
-                new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
-                    {StorageType.DISK, StorageType.ARCHIVE},
-                    {StorageType.ARCHIVE, StorageType.ARCHIVE},
-                    {StorageType.ARCHIVE, StorageType.ARCHIVE}})
-            .build();
-    try {
-      cluster.waitActive();
-      final DistributedFileSystem dfs = cluster.getFileSystem();
-      final String file = "/testMoveSingleBlockToAnotherDatanode";
-      // write to DISK
-      final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
-      out.writeChars("testMoveSingleBlockToAnotherDatanode");
-      out.close();
-
-      // verify before movement
-      LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
-      StorageType[] storageTypes = lb.getStorageTypes();
-      for (StorageType storageType : storageTypes) {
-        Assert.assertTrue(StorageType.DISK == storageType);
-      }
-      // move to ARCHIVE
-      dfs.setStoragePolicy(new Path(file), "COLD");
-
-      lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
-      DataNode src = cluster.getDataNodes().get(3);
-      DatanodeInfo targetDnInfo = DFSTestUtil
-          .getLocalDatanodeInfo(src.getXferPort());
-
-      // TODO: Need to revisit this when NN is implemented to be able to send
-      // block moving commands.
-      StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
-          src);
-      List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
-      BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
-          lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
-          lb.getStorageTypes()[0], StorageType.ARCHIVE);
-      blockMovingInfos.add(blockMovingInfo);
-      INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
-      worker.processBlockMovingTasks(inode.getId(),
-          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
-      cluster.triggerHeartbeats();
-
-      // Wait till NameNode notified about the block location details
-      waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000);
-    } finally {
-      cluster.shutdown();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.ARCHIVE, StorageType.ARCHIVE},
+                {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+        .build();
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String file = "/testMoveSingleBlockToAnotherDatanode";
+    // write to DISK
+    final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
+    out.writeChars("testMoveSingleBlockToAnotherDatanode");
+    out.close();
+
+    // verify before movement
+    LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+    StorageType[] storageTypes = lb.getStorageTypes();
+    for (StorageType storageType : storageTypes) {
+      Assert.assertTrue(StorageType.DISK == storageType);
+    }
+    // move to ARCHIVE
+    dfs.setStoragePolicy(new Path(file), "COLD");
+
+    FSNamesystem namesystem = cluster.getNamesystem();
+    INode inode = namesystem.getFSDirectory().getINode(file);
+    namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+
+    cluster.triggerHeartbeats();
+
+    // Wait till NameNode notified about the block location details
+    waitForLocatedBlockWithArchiveStorageType(dfs, file, 2, 30000);
+  }
+
+  /**
+   * Test to verify that satisfy worker can't move blocks. If specified target
+   * datanode doesn't have enough space to accommodate the moving block.
+   */
+  @Test(timeout = 120000)
+  public void testMoveWithNoSpaceAvailable() throws Exception {
+    final long capacity = 150;
+    final String rack0 = "/rack0";
+    final String rack1 = "/rack1";
+    long[] capacities = new long[] {capacity, capacity, capacity / 2};
+    String[] hosts = {"host0", "host1", "host2"};
+    String[] racks = {rack0, rack1, rack0};
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numOfDatanodes)
+        .hosts(hosts).racks(racks).simulatedCapacities(capacities)
+        .storageTypes(
+            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.DISK, StorageType.ARCHIVE},
+                {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+        .build();
+
+    cluster.waitActive();
+    InetSocketAddress[] favoredNodes = new InetSocketAddress[3];
+    for (int i = 0; i < favoredNodes.length; i++) {
+      // DFSClient will attempt reverse lookup. In case it resolves
+      // "127.0.0.1" to "localhost", we manually specify the hostname.
+      favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
+    }
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String file = "/testMoveWithNoSpaceAvailable";
+    DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 100,
+        DEFAULT_BLOCK_SIZE, (short) 2, 0, false, favoredNodes);
+
+    // verify before movement
+    LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+    StorageType[] storageTypes = lb.getStorageTypes();
+    for (StorageType storageType : storageTypes) {
+      Assert.assertTrue(StorageType.DISK == storageType);
     }
+
+    // move to ARCHIVE
+    dfs.setStoragePolicy(new Path(file), "COLD");
+
+    lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+    DataNode src = cluster.getDataNodes().get(2);
+    DatanodeInfo targetDnInfo = DFSTestUtil
+        .getLocalDatanodeInfo(src.getXferPort());
+
+    StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
+        src);
+    List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+    BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
+        lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
+        lb.getStorageTypes()[0], StorageType.ARCHIVE);
+    blockMovingInfos.add(blockMovingInfo);
+    INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+    worker.processBlockMovingTasks(inode.getId(),
+        cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+
+    waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+  }
+
+  private void waitForBlockMovementCompletion(
+      final StoragePolicySatisfyWorker worker, final long inodeId,
+      int expectedFailedItemsCount, int timeout) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        List<BlockMovementResult> completedBlocks = worker
+            .getBlocksMovementsCompletionHandler().getCompletedBlocks();
+        int failedCount = 0;
+        for (BlockMovementResult blockMovementResult : completedBlocks) {
+          if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
+              blockMovementResult.getStatus()) {
+            failedCount++;
+          }
+        }
+        LOG.info("Block movement completed count={}, expected={} and actual={}",
+            completedBlocks.size(), expectedFailedItemsCount, failedCount);
+        return expectedFailedItemsCount == failedCount;
+      }
+    }, 100, timeout);
   }
 
   private void waitForLocatedBlockWithArchiveStorageType(
@@ -150,7 +234,7 @@ public class TestStoragePolicySatisfyWorker {
     }, 100, timeout);
   }
 
-  BlockMovingInfo prepareBlockMovingInfo(Block block,
+  private BlockMovingInfo prepareBlockMovingInfo(Block block,
       DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
       StorageType targetStorageType) {
     return new BlockMovingInfo(block, new DatanodeInfo[] {src},


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org