You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:09 UTC

[41/50] [abbrv] hadoop git commit: HDFS-10800: [SPS]: Daemon thread in Namenode to find blocks placed in other storage than what the policy specifies. Contributed by Uma Maheswara Rao G

HDFS-10800: [SPS]: Daemon thread in Namenode to find blocks placed in other storage than what the policy specifies. Contributed by Uma Maheswara Rao G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3bb1c040
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3bb1c040
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3bb1c040

Branch: refs/heads/HDFS-10285
Commit: 3bb1c04093fc09ee4d8163bebbdae78804159d0d
Parents: 9ec39c6
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Fri Sep 23 13:41:29 2016 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:12 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  41 ++
 .../server/blockmanagement/BlockManager.java    |  20 +
 .../blockmanagement/DatanodeDescriptor.java     |  38 ++
 .../server/blockmanagement/DatanodeManager.java |   7 +
 .../datanode/StoragePolicySatisfyWorker.java    |  29 +-
 .../namenode/BlockStorageMovementNeeded.java    |  53 +++
 .../server/namenode/StoragePolicySatisfier.java | 397 +++++++++++++++++++
 .../protocol/BlockStorageMovementCommand.java   |  11 +-
 .../TestStoragePolicySatisfyWorker.java         |  24 +-
 .../namenode/TestStoragePolicySatisfier.java    | 209 ++++++++++
 10 files changed, 797 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 23166e2..dc6f0d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -48,6 +48,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -68,6 +69,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -1583,4 +1585,43 @@ public class DFSUtil {
         .createKeyProviderCryptoExtension(keyProvider);
     return cryptoProvider;
   }
+
+  /**
+   * Remove the overlap between the expected types and the existing types.
+   *
+   * @param expected
+   *          - Expected storage types list.
+   * @param existing
+   *          - Existing storage types list.
+   * @param ignoreNonMovable
+   *          ignore non-movable storage types by removing them from both
+   *          expected and existing storage type list to prevent non-movable
+   *          storage from being moved.
+   * @returns if the existing types or the expected types is empty after
+   *          removing the overlap.
+   */
+  public static boolean removeOverlapBetweenStorageTypes(
+      List<StorageType> expected,
+      List<StorageType> existing, boolean ignoreNonMovable) {
+    for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
+      final StorageType t = i.next();
+      if (expected.remove(t)) {
+        i.remove();
+      }
+    }
+    if (ignoreNonMovable) {
+      removeNonMovable(existing);
+      removeNonMovable(expected);
+    }
+    return expected.isEmpty() || existing.isEmpty();
+  }
+
+  private static void removeNonMovable(List<StorageType> types) {
+    for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
+      final StorageType t = i.next();
+      if (!t.isMovable()) {
+        i.remove();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e60703b..4a920e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -366,6 +368,11 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final BlockIdManager blockIdManager;
 
+  /** For satisfying block storage policies */
+  private final StoragePolicySatisfier sps;
+  private final BlockStorageMovementNeeded storageMovementNeeded =
+      new BlockStorageMovementNeeded();
+
   /** Minimum live replicas needed for the datanode to be transitioned
    * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
    */
@@ -401,6 +408,7 @@ public class BlockManager implements BlockStatsMXBean {
         DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
         * 1000L);
 
+    sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this);
     blockTokenSecretManager = createBlockTokenSecretManager(conf);
 
     this.maxCorruptFilesReturned = conf.getInt(
@@ -617,9 +625,11 @@ public class BlockManager implements BlockStatsMXBean {
     this.blockReportThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
+    sps.start();
   }
 
   public void close() {
+	sps.stop();
     bmSafeMode.close();
     try {
       redundancyThread.interrupt();
@@ -4707,4 +4717,14 @@ public class BlockManager implements BlockStatsMXBean {
     }
     return i;
   }
+
+  /**
+   * Set file block collection for which storage movement needed for its blocks.
+   *
+   * @param id
+   *          - file block collection id.
+   */
+  public void satisfyStoragePolicy(long id) {
+    storageMovementNeeded.add(id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 320c680..294e2c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -203,6 +205,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private final LightWeightHashSet<Block> invalidateBlocks =
       new LightWeightHashSet<>();
 
+  /** A queue of blocks for moving its storage placements by this datanode. */
+  private final Queue<List<BlockMovingInfo>> storageMovementBlocks =
+      new LinkedList<>();
+
   /* Variables for maintaining number of blocks scheduled to be written to
    * this storage. This count is approximate and might be slightly bigger
    * in case of errors (e.g. datanode does not report if an error occurs
@@ -928,5 +934,37 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public boolean isRegistered() {
     return isAlive() && !forceRegistration;
   }
+
+  /**
+   * Add the block infos which needs to move its storage locations.
+   *
+   * @param storageMismatchedBlocks
+   *          - storage mismatched block infos
+   */
+  public void addBlocksToMoveStorage(
+      List<BlockMovingInfo> storageMismatchedBlocks) {
+    storageMovementBlocks.offer(storageMismatchedBlocks);
+  }
+
+  /**
+   * @return block infos which needs to move its storage locations.
+   */
+  public List<BlockMovingInfo> getBlocksToMoveStorages() {
+    return storageMovementBlocks.poll();
+  }
+
+  // TODO: we will remove this method once DN side handling integrated. We can
+  // convert the test to check real block movements instead of this ds.
+  @VisibleForTesting
+  public List<BlockMovingInfo> getStorageMovementPendingItems() {
+    List<BlockMovingInfo> flatList = new ArrayList<>();
+    Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
+        .iterator();
+    while (iterator.hasNext()) {
+      List<BlockMovingInfo> next = iterator.next();
+      flatList.addAll(next);
+    }
+    return flatList;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index cc64a04..49b78e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -1632,6 +1633,12 @@ public class DatanodeManager {
       nodeinfo.setBalancerBandwidth(0);
     }
 
+    List<BlockMovingInfo> blocksToMoveStorages = nodeinfo
+        .getBlocksToMoveStorages();
+    if (blocksToMoveStorages != null) {
+      // TODO: create BlockStorageMovementCommand and add here in response.
+    }
+
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 6df4e81..fa408f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@@ -125,7 +126,7 @@ public class StoragePolicySatisfyWorker {
     return moverThreadPool;
   }
 
-  public void processBlockMovingTasks(long trackID,
+  public void processBlockMovingTasks(long trackID, String blockPoolID,
       List<BlockMovingInfo> blockMovingInfos) {
     Future<Void> moveCallable = null;
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
@@ -133,13 +134,11 @@ public class StoragePolicySatisfyWorker {
           .getSources().length == blkMovingInfo.getTargets().length;
 
       for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
-        BlockMovingTask blockMovingTask =
-            new BlockMovingTask(blkMovingInfo.getBlock(),
-            blkMovingInfo.getSources()[i],
-            blkMovingInfo.getTargets()[i],
+        BlockMovingTask blockMovingTask = new BlockMovingTask(
+            blkMovingInfo.getBlock(), blockPoolID,
+            blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
             blkMovingInfo.getTargetStorageTypes()[i]);
-        moveCallable = moverExecutorCompletionService
-            .submit(blockMovingTask);
+        moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
         moverTaskFutures.add(moveCallable);
       }
     }
@@ -163,14 +162,16 @@ public class StoragePolicySatisfyWorker {
    * given target.
    */
   private class BlockMovingTask implements Callable<Void> {
-    private final ExtendedBlock block;
+    private final Block block;
     private final DatanodeInfo source;
     private final DatanodeInfo target;
     private final StorageType targetStorageType;
+    private String blockPoolID;
 
-    BlockMovingTask(ExtendedBlock block, DatanodeInfo source,
+    BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
         DatanodeInfo target, StorageType targetStorageType) {
       this.block = block;
+      this.blockPoolID = blockPoolID;
       this.source = source;
       this.target = target;
       this.targetStorageType = targetStorageType;
@@ -201,12 +202,12 @@ public class StoragePolicySatisfyWorker {
 
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
-
+        ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
         Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
-            block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+            extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
 
         DataEncryptionKeyFactory keyFactory = datanode
-            .getDataEncryptionKeyFactoryForBlock(block);
+            .getDataEncryptionKeyFactoryForBlock(extendedBlock);
         IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
             unbufOut, unbufIn, keyFactory, accessToken, target);
         unbufOut = saslStreams.out;
@@ -215,10 +216,10 @@ public class StoragePolicySatisfyWorker {
             new BufferedOutputStream(unbufOut, ioFileBufferSize));
         in = new DataInputStream(
             new BufferedInputStream(unbufIn, ioFileBufferSize));
-        sendRequest(out, block, accessToken, source, targetStorageType);
+        sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
         receiveResponse(in);
 
-        LOG.debug(
+        LOG.info(
             "Successfully moved block:{} from src:{} to destin:{} for"
                 + " satisfying storageType:{}",
             block, source, target, targetStorageType);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
new file mode 100644
index 0000000..c916672
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A Class to track the block collection IDs for which physical storage movement
+ * needed as per the Namespace and StorageReports from DN.
+ */
+@InterfaceAudience.Private
+public class BlockStorageMovementNeeded {
+  private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();
+
+  /**
+   * Add the block collection id to tracking list for which storage movement
+   * expected if necessary.
+   *
+   * @param blockCollectionID
+   *          - block collection id, which is nothing but inode id.
+   */
+  public synchronized void add(Long blockCollectionID) {
+    storageMovementNeeded.add(blockCollectionID);
+  }
+
+  /**
+   * Gets the block collection id for which storage movements check necessary
+   * and make the movement if required.
+   *
+   * @return block collection ID
+   */
+  public synchronized Long get() {
+    return storageMovementNeeded.poll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
new file mode 100644
index 0000000..b5aed37
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.balancer.Matcher;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Setting storagePolicy on a file after the file write will only update the new
+ * storage policy type in Namespace, but physical block storage movement will
+ * not happen until user runs "Mover Tool" explicitly for such files. The
+ * StoragePolicySatisfier Daemon thread implemented for addressing the case
+ * where users may want to physically move the blocks by HDFS itself instead of
+ * running mover tool explicitly. Just calling client API to
+ * satisfyStoragePolicy on a file/dir will automatically trigger to move its
+ * physical storage locations as expected in asynchronous manner. Here Namenode
+ * will pick the file blocks which are expecting to change its storages, then it
+ * will build the mapping of source block location and expected storage type and
+ * location to move. After that this class will also prepare commands to send to
+ * Datanode for processing the physical block movements.
+ */
+@InterfaceAudience.Private
+public class StoragePolicySatisfier implements Runnable {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(StoragePolicySatisfier.class);
+  private Daemon storagePolicySatisfierThread;
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final BlockStorageMovementNeeded storageMovementNeeded;
+
+  public StoragePolicySatisfier(final Namesystem namesystem,
+      final BlockStorageMovementNeeded storageMovementNeeded,
+      final BlockManager blkManager) {
+    this.namesystem = namesystem;
+    this.storageMovementNeeded = storageMovementNeeded;
+    this.blockManager = blkManager;
+  }
+
+  /**
+   * Start storage policy satisfier demon thread.
+   */
+  public void start() {
+    storagePolicySatisfierThread = new Daemon(this);
+    storagePolicySatisfierThread.setName("StoragePolicySatisfier");
+    storagePolicySatisfierThread.start();
+  }
+
+  /**
+   * Stop storage policy satisfier demon thread.
+   */
+  public void stop() {
+    if (storagePolicySatisfierThread == null) {
+      return;
+    }
+    storagePolicySatisfierThread.interrupt();
+    try {
+      storagePolicySatisfierThread.join(3000);
+    } catch (InterruptedException ie) {
+    }
+  }
+
+  @Override
+  public void run() {
+    while (namesystem.isRunning()) {
+      try {
+        Long blockCollectionID = storageMovementNeeded.get();
+        if (blockCollectionID != null) {
+          computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
+        }
+        // TODO: We can think to make this as configurable later, how frequently
+        // we want to check block movements.
+        Thread.sleep(3000);
+      } catch (Throwable t) {
+        if (!namesystem.isRunning()) {
+          LOG.info("Stopping StoragePolicySatisfier.");
+          if (!(t instanceof InterruptedException)) {
+            LOG.info("StoragePolicySatisfier received an exception"
+                + " while shutting down.", t);
+          }
+          break;
+        }
+        LOG.error("StoragePolicySatisfier thread received runtime exception. "
+            + "Stopping Storage policy satisfier work", t);
+        // TODO: Just break for now. Once we implement dynamic start/stop
+        // option, we can add conditions here when to break/terminate.
+        break;
+      }
+    }
+  }
+
+  private void computeAndAssignStorageMismatchedBlocksToDNs(
+      long blockCollectionID) {
+    BlockCollection blockCollection =
+        namesystem.getBlockCollection(blockCollectionID);
+    if (blockCollection == null) {
+      return;
+    }
+    byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
+    BlockStoragePolicy existingStoragePolicy =
+        blockManager.getStoragePolicy(existingStoragePolicyID);
+    if (!blockCollection.getLastBlock().isComplete()) {
+      // Postpone, currently file is under construction
+      // So, should we add back? or leave it to user
+      return;
+    }
+
+    // First datanode will be chosen as the co-ordinator node for storage
+    // movements. Later this can be optimized if needed.
+    DatanodeDescriptor coordinatorNode = null;
+    BlockInfo[] blocks = blockCollection.getBlocks();
+    List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
+    for (int i = 0; i < blocks.length; i++) {
+      BlockInfo blockInfo = blocks[i];
+      List<StorageType> expectedStorageTypes =
+          existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication());
+      DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
+      StorageType[] storageTypes = new StorageType[storages.length];
+      for (int j = 0; j < storages.length; j++) {
+        DatanodeStorageInfo datanodeStorageInfo = storages[j];
+        StorageType storageType = datanodeStorageInfo.getStorageType();
+        storageTypes[j] = storageType;
+      }
+      List<StorageType> existing =
+          new LinkedList<StorageType>(Arrays.asList(storageTypes));
+      if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+          existing, true)) {
+        List<StorageTypeNodePair> sourceWithStorageMap =
+            new ArrayList<StorageTypeNodePair>();
+        List<DatanodeStorageInfo> existingBlockStorages =
+            new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+        for (StorageType existingType : existing) {
+          Iterator<DatanodeStorageInfo> iterator =
+              existingBlockStorages.iterator();
+          while (iterator.hasNext()) {
+            DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+            StorageType storageType = datanodeStorageInfo.getStorageType();
+            if (storageType == existingType) {
+              iterator.remove();
+              sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
+                  datanodeStorageInfo.getDatanodeDescriptor()));
+              break;
+            }
+          }
+        }
+
+        StorageTypeNodeMap locsForExpectedStorageTypes =
+            findTargetsForExpectedStorageTypes(expectedStorageTypes);
+
+        BlockMovingInfo blockMovingInfo =
+            findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap,
+                expectedStorageTypes, locsForExpectedStorageTypes);
+        if (coordinatorNode == null) {
+          // For now, first datanode will be chosen as the co-ordinator. Later
+          // this can be optimized if needed.
+          coordinatorNode =
+              (DatanodeDescriptor) blockMovingInfo.getSources()[0];
+        }
+        blockMovingInfos.add(blockMovingInfo);
+      }
+    }
+
+    if (blockMovingInfos.size() < 1) {
+      // TODO: Major: handle this case. I think we need retry cases to
+      // be implemented. Idea is, if some files are not getting storage movement
+      // chances, then we can just retry limited number of times and exit.
+      return;
+    }
+    coordinatorNode.addBlocksToMoveStorage(blockMovingInfos);
+  }
+
+  /**
+   * Find the good target node for each source node for which block storages was
+   * misplaced.
+   *
+   * @param blockInfo
+   *          - Block
+   * @param existing
+   *          - Existing storage types of block
+   * @param sourceWithStorageList
+   *          - Source Datanode with storages list
+   * @param expected
+   *          - Expecting storages to move
+   * @param locsForExpectedStorageTypes
+   *          - Available DNs for expected storage types
+   * @return list of block source and target node pair
+   */
+  private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo,
+      List<StorageType> existing,
+      List<StorageTypeNodePair> sourceWithStorageList,
+      List<StorageType> expected,
+      StorageTypeNodeMap locsForExpectedStorageTypes) {
+    List<DatanodeInfo> sourceNodes = new ArrayList<>();
+    List<StorageType> sourceStorageTypes = new ArrayList<>();
+    List<DatanodeInfo> targetNodes = new ArrayList<>();
+    List<StorageType> targetStorageTypes = new ArrayList<>();
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
+    for (int i = 0; i < sourceWithStorageList.size(); i++) {
+      StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
+      StorageTypeNodePair chosenTarget =
+          chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected,
+              locsForExpectedStorageTypes, chosenNodes);
+
+      if (chosenTarget == null && blockManager.getDatanodeManager()
+          .getNetworkTopology().isNodeGroupAware()) {
+        chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
+            expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
+            chosenNodes);
+      }
+
+      // Then, match nodes on the same rack
+      if (chosenTarget == null) {
+        chosenTarget =
+            chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
+                Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes);
+      }
+
+      if (chosenTarget == null) {
+        chosenTarget =
+            chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
+                Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes);
+      }
+      if (null != chosenTarget) {
+        sourceNodes.add(existingTypeNodePair.dn);
+        sourceStorageTypes.add(existingTypeNodePair.storageType);
+        targetNodes.add(chosenTarget.dn);
+        targetStorageTypes.add(chosenTarget.storageType);
+        chosenNodes.add(chosenTarget.dn);
+        // TODO: We can increment scheduled block count for this node?
+      } else {
+        // TODO: Failed to ChooseTargetNodes...So let just retry. Shall we
+        // proceed without this targets? Then what should be final result?
+        // How about pack empty target, means target node could not be chosen ,
+        // so result should be RETRY_REQUIRED from DN always.
+        // Log..unable to choose target node for source datanodeDescriptor
+        sourceNodes.add(existingTypeNodePair.dn);
+        sourceStorageTypes.add(existingTypeNodePair.storageType);
+        targetNodes.add(null);
+        targetStorageTypes.add(null);
+      }
+    }
+    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
+        sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
+        targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
+        sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
+        targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
+    return blkMovingInfo;
+  }
+
+  /**
+   * Choose the target storage within same Datanode if possible.
+   *
+   * @param locsForExpectedStorageTypes
+   * @param chosenNodes
+   */
+  private StorageTypeNodePair chooseTargetTypeInSameNode(
+      DatanodeDescriptor source, List<StorageType> targetTypes,
+      StorageTypeNodeMap locsForExpectedStorageTypes,
+      List<DatanodeDescriptor> chosenNodes) {
+    for (StorageType t : targetTypes) {
+      DatanodeStorageInfo chooseStorage4Block =
+          source.chooseStorage4Block(t, 0);
+      if (chooseStorage4Block != null) {
+        return new StorageTypeNodePair(t, source);
+      }
+    }
+    return null;
+  }
+
+  private StorageTypeNodePair chooseTarget(Block block,
+      DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
+      StorageTypeNodeMap locsForExpectedStorageTypes,
+      List<DatanodeDescriptor> chosenNodes) {
+    for (StorageType t : targetTypes) {
+      List<DatanodeDescriptor> nodesWithStorages =
+          locsForExpectedStorageTypes.getNodesWithStorages(t);
+      Collections.shuffle(nodesWithStorages);
+      for (DatanodeDescriptor target : nodesWithStorages) {
+        if (!chosenNodes.contains(target) && matcher.match(
+            blockManager.getDatanodeManager().getNetworkTopology(), source,
+            target)) {
+          if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
+            return new StorageTypeNodePair(t, target);
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  private static class StorageTypeNodePair {
+    public StorageType storageType = null;
+    public DatanodeDescriptor dn = null;
+
+    public StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
+      this.storageType = storageType;
+      this.dn = dn;
+    }
+  }
+
+  private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
+      List<StorageType> expected) {
+    StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
+    List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
+        .getDatanodeListForReport(DatanodeReportType.LIVE);
+    for (DatanodeDescriptor dn : reports) {
+      StorageReport[] storageReports = dn.getStorageReports();
+      for (StorageReport storageReport : storageReports) {
+        StorageType t = storageReport.getStorage().getStorageType();
+        if (expected.contains(t)) {
+          final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
+          if (maxRemaining > 0L) {
+            targetMap.add(t, dn);
+          }
+        }
+      }
+    }
+    return targetMap;
+  }
+
+  private static long getMaxRemaining(StorageReport[] storageReports,
+      StorageType t) {
+    long max = 0L;
+    for (StorageReport r : storageReports) {
+      if (r.getStorage().getStorageType() == t) {
+        if (r.getRemaining() > max) {
+          max = r.getRemaining();
+        }
+      }
+    }
+    return max;
+  }
+
+  private static class StorageTypeNodeMap {
+    private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
+        new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
+
+    private void add(StorageType t, DatanodeDescriptor dn) {
+      List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
+      LinkedList<DatanodeDescriptor> value = null;
+      if (nodesWithStorages == null) {
+        value = new LinkedList<DatanodeDescriptor>();
+        value.add(dn);
+        typeNodeMap.put(t, value);
+      } else {
+        nodesWithStorages.add(dn);
+      }
+    }
+
+    /**
+     * @param type
+     *          - Storage type
+     * @return datanodes which has the given storage type
+     */
+    private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
+      return typeNodeMap.get(type);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index 42ba265..c1ab800 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs.server.protocol;
 import java.util.Arrays;
 
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 
 /**
  * A BlockStorageMovementCommand is an instruction to a DataNode to move the
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
  * NameNode about the movement status.
  */
 public class BlockStorageMovementCommand extends DatanodeCommand {
-
   // TODO: constructor needs to be refined based on the block movement data
   // structure.
   BlockStorageMovementCommand(int action) {
@@ -46,13 +45,13 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
    * Stores block to storage info that can be used for block movement.
    */
   public static class BlockMovingInfo {
-    private ExtendedBlock blk;
+    private Block blk;
     private DatanodeInfo[] sourceNodes;
     private StorageType[] sourceStorageTypes;
     private DatanodeInfo[] targetNodes;
     private StorageType[] targetStorageTypes;
 
-    public BlockMovingInfo(ExtendedBlock block,
+    public BlockMovingInfo(Block block,
         DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
         StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
       this.blk = block;
@@ -62,11 +61,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
       this.targetStorageTypes = targetStorageTypes;
     }
 
-    public void addBlock(ExtendedBlock block) {
+    public void addBlock(Block block) {
       this.blk = block;
     }
 
-    public ExtendedBlock getBlock() {
+    public Block getBlock() {
       return this.blk;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index c722306..d803f1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -71,14 +71,14 @@ public class TestStoragePolicySatisfyWorker {
   public void testMoveSingleBlockToAnotherDatanode() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     initConf(conf);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(4)
-        .storageTypes(
-            new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.DISK, StorageType.ARCHIVE},
-                {StorageType.DISK, StorageType.ARCHIVE}})
-        .build();
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(4)
+            .storageTypes(
+                new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+                    {StorageType.DISK, StorageType.ARCHIVE},
+                    {StorageType.ARCHIVE, StorageType.ARCHIVE},
+                    {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+            .build();
     try {
       cluster.waitActive();
       final DistributedFileSystem dfs = cluster.getFileSystem();
@@ -108,12 +108,12 @@ public class TestStoragePolicySatisfyWorker {
           src);
       List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
       BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
-          lb.getBlock(), lb.getLocations()[0], targetDnInfo,
+          lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
           lb.getStorageTypes()[0], StorageType.ARCHIVE);
       blockMovingInfos.add(blockMovingInfo);
       INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
       worker.processBlockMovingTasks(inode.getId(),
-          blockMovingInfos);
+          cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
       cluster.triggerHeartbeats();
 
       // Wait till NameNode notified about the block location details
@@ -150,7 +150,7 @@ public class TestStoragePolicySatisfyWorker {
     }, 100, timeout);
   }
 
-  BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block,
+  BlockMovingInfo prepareBlockMovingInfo(Block block,
       DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
       StorageType targetStorageType) {
     return new BlockMovingInfo(block, new DatanodeInfo[] {src},

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
new file mode 100644
index 0000000..b61814d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests that StoragePolicySatisfier daemon is able to check the blocks to be
+ * moved and finding its suggested target locations to move.
+ */
+public class TestStoragePolicySatisfier {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
+  private final Configuration config = new HdfsConfiguration();
+  private StorageType[][] allDiskTypes =
+      new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK},
+          {StorageType.DISK, StorageType.DISK}};
+  private MiniDFSCluster hdfsCluster = null;
+  final private int numOfDatanodes = 3;
+  final private int storagesPerDatanode = 2;
+  final private long capacity = 2 * 256 * 1024 * 1024;
+  final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
+  private DistributedFileSystem distributedFS = null;
+
+  @Before
+  public void setUp() throws IOException {
+    config.setLong("dfs.block.size", 1024);
+    hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
+        storagesPerDatanode, capacity);
+    distributedFS = hdfsCluster.getFileSystem();
+    writeContent(distributedFS, file);
+  }
+
+  @Test(timeout = 300000)
+  public void testWhenStoragePolicySetToCOLD()
+      throws Exception {
+
+    try {
+      // Change policy to ALL_SSD
+      distributedFS.setStoragePolicy(new Path(file), "COLD");
+      Set<DatanodeDescriptor> previousNodes =
+          hdfsCluster.getNameNode().getNamesystem().getBlockManager()
+              .getDatanodeManager().getDatanodes();
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+      startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+
+      hdfsCluster.triggerHeartbeats();
+      // Wait till namenode notified about the block location details
+      waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes,
+          6, 30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testWhenStoragePolicySetToALLSSD()
+      throws Exception {
+    try {
+      // Change policy to ALL_SSD
+      distributedFS.setStoragePolicy(new Path(file), "ALL_SSD");
+      Set<DatanodeDescriptor> previousNodes =
+          hdfsCluster.getNameNode().getNamesystem().getBlockManager()
+              .getDatanodeManager().getDatanodes();
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK},
+              {StorageType.SSD, StorageType.DISK},
+              {StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+      // Wait till StorgePolicySatisfier Identified that block to move to SSD
+      // areas
+      waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6,
+          30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  private void writeContent(final DistributedFileSystem dfs,
+      final String fileName) throws IOException {
+    // write to DISK
+    final FSDataOutputStream out = dfs.create(new Path(fileName));
+    for (int i = 0; i < 1000; i++) {
+      out.writeChars("t");
+    }
+    out.close();
+  }
+
+  private void startAdditionalDNs(final Configuration conf,
+      int newNodesRequired, int existingNodesNum, StorageType[][] newTypes,
+      int storagesPerDatanode, long capacity, final MiniDFSCluster cluster)
+          throws IOException {
+    long[][] capacities;
+    existingNodesNum += newNodesRequired;
+    capacities = new long[newNodesRequired][storagesPerDatanode];
+    for (int i = 0; i < newNodesRequired; i++) {
+      for (int j = 0; j < storagesPerDatanode; j++) {
+        capacities[i][j] = capacity;
+      }
+    }
+
+    cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
+        null, capacities, null, false, false, false, null);
+    cluster.triggerHeartbeats();
+  }
+
+  private MiniDFSCluster startCluster(final Configuration conf,
+      StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
+      long nodeCapacity) throws IOException {
+    long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
+    for (int i = 0; i < numberOfDatanodes; i++) {
+      for (int j = 0; j < storagesPerDn; j++) {
+        capacities[i][j] = nodeCapacity;
+      }
+    }
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
+        .storageTypes(storageTypes).storageCapacities(capacities).build();
+    cluster.waitActive();
+    return cluster;
+  }
+
+  // TODO: this assertion can be changed to end to end based assertion later
+  // when DN side processing work integrated to this work.
+  private void waitExpectedStorageType(final StorageType expectedStorageType,
+      final DistributedFileSystem dfs,
+      final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount,
+      int timeout) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        Iterator<DatanodeDescriptor> iterator = previousNodes.iterator();
+        int archiveCount = 0;
+        while (iterator.hasNext()) {
+          DatanodeDescriptor dn = iterator.next();
+          List<BlockMovingInfo> pendingItemsToMove =
+              dn.getStorageMovementPendingItems();
+          for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) {
+            StorageType[] targetStorageTypes =
+                blkInfoToMoveStorage.getTargetStorageTypes();
+            for (StorageType storageType : targetStorageTypes) {
+              if (storageType == expectedStorageType) {
+                archiveCount++;
+              }
+            }
+          }
+        }
+        LOG.info(
+            expectedStorageType + " replica count, expected={} and actual={}",
+            expectedArchiveCount, archiveCount);
+        return expectedArchiveCount == archiveCount;
+      }
+    }, 100, timeout);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org