You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:20 UTC
[27/50] [abbrv] hadoop git commit: HDFS-12955: [SPS]: Move SPS
classes to a separate package. Contributed by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
new file mode 100644
index 0000000..5635621
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -0,0 +1,572 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A Class to track the block collection IDs (Inode's ID) for which physical
+ * storage movement needed as per the Namespace and StorageReports from DN.
+ * It scan the pending directories for which storage movement is required and
+ * schedule the block collection IDs for movement. It track the info of
+ * scheduled items and remove the SPS xAttr from the file/Directory once
+ * movement is success.
+ */
+@InterfaceAudience.Private
+public class BlockStorageMovementNeeded {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
+
+ private final Queue<ItemInfo> storageMovementNeeded =
+ new LinkedList<ItemInfo>();
+
+ /**
+ * Map of startId and number of child's. Number of child's indicate the
+ * number of files pending to satisfy the policy.
+ */
+ private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
+ new HashMap<Long, DirPendingWorkInfo>();
+
+ private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
+ new ConcurrentHashMap<>();
+
+ private final Namesystem namesystem;
+
+ // List of pending dir to satisfy the policy
+ private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+
+ private final StoragePolicySatisfier sps;
+
+ private Daemon inodeIdCollector;
+
+ private final int maxQueuedItem;
+
+ // Amount of time to cache the SUCCESS status of path before turning it to
+ // NOT_AVAILABLE.
+ private static long statusClearanceElapsedTimeMs = 300000;
+
+ public BlockStorageMovementNeeded(Namesystem namesystem,
+ StoragePolicySatisfier sps, int queueLimit) {
+ this.namesystem = namesystem;
+ this.sps = sps;
+ this.maxQueuedItem = queueLimit;
+ }
+
+ /**
+ * Add the candidate to tracking list for which storage movement
+ * expected if necessary.
+ *
+ * @param trackInfo
+ * - track info for satisfy the policy
+ */
+ public synchronized void add(ItemInfo trackInfo) {
+ spsStatus.put(trackInfo.getStartId(),
+ new StoragePolicySatisfyPathStatusInfo(
+ StoragePolicySatisfyPathStatus.IN_PROGRESS));
+ storageMovementNeeded.add(trackInfo);
+ }
+
+ /**
+ * Add the itemInfo to tracking list for which storage movement
+ * expected if necessary.
+ * @param startId
+ * - start id
+ * @param itemInfoList
+ * - List of child in the directory
+ */
+ @VisibleForTesting
+ public synchronized void addAll(long startId,
+ List<ItemInfo> itemInfoList, boolean scanCompleted) {
+ storageMovementNeeded.addAll(itemInfoList);
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ if (pendingWork == null) {
+ pendingWork = new DirPendingWorkInfo();
+ pendingWorkForDirectory.put(startId, pendingWork);
+ }
+ pendingWork.addPendingWorkCount(itemInfoList.size());
+ if (scanCompleted) {
+ pendingWork.markScanCompleted();
+ }
+ }
+
+ /**
+ * Gets the block collection id for which storage movements check necessary
+ * and make the movement if required.
+ *
+ * @return block collection ID
+ */
+ public synchronized ItemInfo get() {
+ return storageMovementNeeded.poll();
+ }
+
+ public synchronized void addToPendingDirQueue(long id) {
+ spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo(
+ StoragePolicySatisfyPathStatus.PENDING));
+ spsDirsToBeTraveresed.add(id);
+ // Notify waiting FileInodeIdCollector thread about the newly
+ // added SPS path.
+ synchronized (spsDirsToBeTraveresed) {
+ spsDirsToBeTraveresed.notify();
+ }
+ }
+
+ /**
+ * Returns queue remaining capacity.
+ */
+ public synchronized int remainingCapacity() {
+ int size = storageMovementNeeded.size();
+ if (size >= maxQueuedItem) {
+ return 0;
+ } else {
+ return (maxQueuedItem - size);
+ }
+ }
+
+ /**
+ * Returns queue size.
+ */
+ public synchronized int size() {
+ return storageMovementNeeded.size();
+ }
+
+ public synchronized void clearAll() {
+ spsDirsToBeTraveresed.clear();
+ storageMovementNeeded.clear();
+ pendingWorkForDirectory.clear();
+ }
+
+ /**
+ * Decrease the pending child count for directory once one file blocks moved
+ * successfully. Remove the SPS xAttr if pending child count is zero.
+ */
+ public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
+ boolean isSuccess) throws IOException {
+ if (trackInfo.isDir()) {
+ // If track is part of some start inode then reduce the pending
+ // directory work count.
+ long startId = trackInfo.getStartId();
+ INode inode = namesystem.getFSDirectory().getInode(startId);
+ if (inode == null) {
+ // directory deleted just remove it.
+ this.pendingWorkForDirectory.remove(startId);
+ updateStatus(startId, isSuccess);
+ } else {
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ if (pendingWork != null) {
+ pendingWork.decrementPendingWorkCount();
+ if (pendingWork.isDirWorkDone()) {
+ namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(startId);
+ pendingWork.setFailure(!isSuccess);
+ updateStatus(startId, pendingWork.isPolicySatisfied());
+ }
+ pendingWork.setFailure(isSuccess);
+ }
+ }
+ } else {
+ // Remove xAttr if trackID doesn't exist in
+ // storageMovementAttemptedItems or file policy satisfied.
+ namesystem.removeXattr(trackInfo.getTrackId(),
+ XATTR_SATISFY_STORAGE_POLICY);
+ updateStatus(trackInfo.getStartId(), isSuccess);
+ }
+ }
+
+ public synchronized void clearQueue(long trackId) {
+ spsDirsToBeTraveresed.remove(trackId);
+ Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
+ while (iterator.hasNext()) {
+ ItemInfo next = iterator.next();
+ if (next.getStartId() == trackId) {
+ iterator.remove();
+ }
+ }
+ pendingWorkForDirectory.remove(trackId);
+ }
+
+ /**
+ * Mark inode status as SUCCESS in map.
+ */
+ private void updateStatus(long startId, boolean isSuccess){
+ StoragePolicySatisfyPathStatusInfo spsStatusInfo =
+ spsStatus.get(startId);
+ if (spsStatusInfo == null) {
+ spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
+ spsStatus.put(startId, spsStatusInfo);
+ }
+
+ if (isSuccess) {
+ spsStatusInfo.setSuccess();
+ } else {
+ spsStatusInfo.setFailure();
+ }
+ }
+
+ /**
+ * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
+ * and notify to clean up required resources.
+ * @throws IOException
+ */
+ public synchronized void clearQueuesWithNotification() {
+ // Remove xAttr from directories
+ Long trackId;
+ while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
+ try {
+ // Remove xAttr for file
+ namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+ } catch (IOException ie) {
+ LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
+ }
+ }
+
+ // File's directly added to storageMovementNeeded, So try to remove
+ // xAttr for file
+ ItemInfo itemInfo;
+ while ((itemInfo = storageMovementNeeded.poll()) != null) {
+ try {
+ // Remove xAttr for file
+ if (!itemInfo.isDir()) {
+ namesystem.removeXattr(itemInfo.getTrackId(),
+ XATTR_SATISFY_STORAGE_POLICY);
+ }
+ } catch (IOException ie) {
+ LOG.warn(
+ "Failed to remove SPS xattr for track id "
+ + itemInfo.getTrackId(), ie);
+ }
+ }
+ this.clearAll();
+ }
+
+ /**
+ * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
+ * ID's to process for satisfy the policy.
+ */
+ private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
+ implements Runnable {
+
+ private int remainingCapacity = 0;
+
+ private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
+
+ StorageMovementPendingInodeIdCollector(FSDirectory dir) {
+ super(dir);
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Starting FileInodeIdCollector!.");
+ long lastStatusCleanTime = 0;
+ while (namesystem.isRunning() && sps.isRunning()) {
+ try {
+ if (!namesystem.isInSafeMode()) {
+ FSDirectory fsd = namesystem.getFSDirectory();
+ Long startINodeId = spsDirsToBeTraveresed.poll();
+ if (startINodeId == null) {
+ // Waiting for SPS path
+ synchronized (spsDirsToBeTraveresed) {
+ spsDirsToBeTraveresed.wait(5000);
+ }
+ } else {
+ INode startInode = fsd.getInode(startINodeId);
+ if (startInode != null) {
+ try {
+ remainingCapacity = remainingCapacity();
+ spsStatus.put(startINodeId,
+ new StoragePolicySatisfyPathStatusInfo(
+ StoragePolicySatisfyPathStatus.IN_PROGRESS));
+ readLock();
+ traverseDir(startInode.asDirectory(), startINodeId,
+ HdfsFileStatus.EMPTY_NAME,
+ new SPSTraverseInfo(startINodeId));
+ } finally {
+ readUnlock();
+ }
+ // Mark startInode traverse is done
+ addAll(startInode.getId(), currentBatch, true);
+ currentBatch.clear();
+
+ // check if directory was empty and no child added to queue
+ DirPendingWorkInfo dirPendingWorkInfo =
+ pendingWorkForDirectory.get(startInode.getId());
+ if (dirPendingWorkInfo.isDirWorkDone()) {
+ namesystem.removeXattr(startInode.getId(),
+ XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(startInode.getId());
+ updateStatus(startInode.getId(), true);
+ }
+ }
+ }
+ //Clear the SPS status if status is in SUCCESS more than 5 min.
+ if (Time.monotonicNow()
+ - lastStatusCleanTime > statusClearanceElapsedTimeMs) {
+ lastStatusCleanTime = Time.monotonicNow();
+ cleanSpsStatus();
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("Exception while loading inodes to satisfy the policy", t);
+ }
+ }
+ }
+
+ private synchronized void cleanSpsStatus() {
+ for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
+ spsStatus.entrySet().iterator(); it.hasNext();) {
+ Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
+ if (entry.getValue().canRemove()) {
+ it.remove();
+ }
+ }
+ }
+
+ @Override
+ protected void checkPauseForTesting() throws InterruptedException {
+ // TODO implement if needed
+ }
+
+ @Override
+ protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing {} for statisy the policy",
+ inode.getFullPathName());
+ }
+ if (!inode.isFile()) {
+ return false;
+ }
+ if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+ currentBatch.add(new ItemInfo(
+ ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+ remainingCapacity--;
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean canSubmitCurrentBatch() {
+ return remainingCapacity <= 0;
+ }
+
+ @Override
+ protected void checkINodeReady(long startId) throws IOException {
+ // SPS work won't be scheduled if NN is in standby. So, skipping NN
+ // standby check.
+ return;
+ }
+
+ @Override
+ protected void submitCurrentBatch(long startId)
+ throws IOException, InterruptedException {
+ // Add current child's to queue
+ addAll(startId, currentBatch, false);
+ currentBatch.clear();
+ }
+
+ @Override
+ protected void throttle() throws InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ + " waiting for some free slots.");
+ }
+ remainingCapacity = remainingCapacity();
+ // wait for queue to be free
+ while (remainingCapacity <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+ }
+ Thread.sleep(5000);
+ remainingCapacity = remainingCapacity();
+ }
+ }
+
+ @Override
+ protected boolean canTraverseDir(INode inode) throws IOException {
+ return true;
+ }
+ }
+
+ /**
+ * Info for directory recursive scan.
+ */
+ public static class DirPendingWorkInfo {
+
+ private int pendingWorkCount = 0;
+ private boolean fullyScanned = false;
+ private boolean success = true;
+
+ /**
+ * Increment the pending work count for directory.
+ */
+ public synchronized void addPendingWorkCount(int count) {
+ this.pendingWorkCount = this.pendingWorkCount + count;
+ }
+
+ /**
+ * Decrement the pending work count for directory one track info is
+ * completed.
+ */
+ public synchronized void decrementPendingWorkCount() {
+ this.pendingWorkCount--;
+ }
+
+ /**
+ * Return true if all the pending work is done and directory fully
+ * scanned, otherwise false.
+ */
+ public synchronized boolean isDirWorkDone() {
+ return (pendingWorkCount <= 0 && fullyScanned);
+ }
+
+ /**
+ * Mark directory scan is completed.
+ */
+ public synchronized void markScanCompleted() {
+ this.fullyScanned = true;
+ }
+
+ /**
+ * Return true if all the files block movement is success, otherwise false.
+ */
+ public boolean isPolicySatisfied() {
+ return success;
+ }
+
+ /**
+ * Set directory SPS status failed.
+ */
+ public void setFailure(boolean failure) {
+ this.success = this.success || failure;
+ }
+ }
+
+ public void init() {
+ inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
+ namesystem.getFSDirectory()));
+ inodeIdCollector.setName("FileInodeIdCollector");
+ inodeIdCollector.start();
+ }
+
+ public void close() {
+ if (inodeIdCollector != null) {
+ inodeIdCollector.interrupt();
+ }
+ }
+
+ class SPSTraverseInfo extends TraverseInfo {
+ private long startId;
+
+ SPSTraverseInfo(long startId) {
+ this.startId = startId;
+ }
+
+ public long getStartId() {
+ return startId;
+ }
+ }
+
+ /**
+ * Represent the file/directory block movement status.
+ */
+ static class StoragePolicySatisfyPathStatusInfo {
+ private StoragePolicySatisfyPathStatus status =
+ StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
+ private long lastStatusUpdateTime;
+
+ StoragePolicySatisfyPathStatusInfo() {
+ this.lastStatusUpdateTime = 0;
+ }
+
+ StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) {
+ this.status = status;
+ this.lastStatusUpdateTime = 0;
+ }
+
+ private void setSuccess() {
+ this.status = StoragePolicySatisfyPathStatus.SUCCESS;
+ this.lastStatusUpdateTime = Time.monotonicNow();
+ }
+
+ private void setFailure() {
+ this.status = StoragePolicySatisfyPathStatus.FAILURE;
+ this.lastStatusUpdateTime = Time.monotonicNow();
+ }
+
+ private StoragePolicySatisfyPathStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * Return true if SUCCESS status cached more then 5 min.
+ */
+ private boolean canRemove() {
+ return (StoragePolicySatisfyPathStatus.SUCCESS == status
+ || StoragePolicySatisfyPathStatus.FAILURE == status)
+ && (Time.monotonicNow()
+ - lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
+ }
+ }
+
+ public StoragePolicySatisfyPathStatus getStatus(long id) {
+ StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id);
+ if(spsStatusInfo == null){
+ return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
+ }
+ return spsStatusInfo.getStatus();
+ }
+
+ @VisibleForTesting
+ public static void setStatusClearanceElapsedTimeMs(
+ long statusClearanceElapsedTimeMs) {
+ BlockStorageMovementNeeded.statusClearanceElapsedTimeMs =
+ statusClearanceElapsedTimeMs;
+ }
+
+ @VisibleForTesting
+ public static long getStatusClearanceElapsedTimeMs() {
+ return statusClearanceElapsedTimeMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
new file mode 100644
index 0000000..0d4bb19
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -0,0 +1,988 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
+import org.apache.hadoop.hdfs.server.balancer.Matcher;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Setting storagePolicy on a file after the file write will only update the new
+ * storage policy type in Namespace, but physical block storage movement will
+ * not happen until user runs "Mover Tool" explicitly for such files. The
+ * StoragePolicySatisfier Daemon thread implemented for addressing the case
+ * where users may want to physically move the blocks by HDFS itself instead of
+ * running mover tool explicitly. Just calling client API to
+ * satisfyStoragePolicy on a file/dir will automatically trigger to move its
+ * physical storage locations as expected in asynchronous manner. Here Namenode
+ * will pick the file blocks which are expecting to change its storages, then it
+ * will build the mapping of source block location and expected storage type and
+ * location to move. After that this class will also prepare commands to send to
+ * Datanode for processing the physical block movements.
+ */
+@InterfaceAudience.Private
+public class StoragePolicySatisfier implements Runnable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StoragePolicySatisfier.class);
+ private Daemon storagePolicySatisfierThread;
+ private final Namesystem namesystem;
+ private final BlockManager blockManager;
+ private final BlockStorageMovementNeeded storageMovementNeeded;
+ private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
+ private volatile boolean isRunning = false;
+ private int spsWorkMultiplier;
+ private long blockCount = 0L;
+ private int blockMovementMaxRetry;
+ private final Context ctxt;
+
+ /**
+ * An interface for analyzing and assigning the block storage movements to
+ * worker nodes.
+ */
+ // TODO: Now, added one API which is required for sps package. Will refine
+ // this interface via HDFS-12911.
+ public interface Context {
+ int getNumLiveDataNodes();
+ }
+
+ /**
+ * Represents the collective analysis status for all blocks.
+ */
+ private static class BlocksMovingAnalysis {
+
+ enum Status {
+ // Represents that, the analysis skipped due to some conditions. A such
+ // condition is if block collection is in incomplete state.
+ ANALYSIS_SKIPPED_FOR_RETRY,
+ // Represents that few or all blocks found respective target to do
+ // the storage movement.
+ BLOCKS_TARGETS_PAIRED,
+ // Represents that none of the blocks found respective target to do
+ // the storage movement.
+ NO_BLOCKS_TARGETS_PAIRED,
+ // Represents that, none of the blocks found for block storage movements.
+ BLOCKS_ALREADY_SATISFIED,
+ // Represents that, the analysis skipped due to some conditions.
+ // Example conditions are if no blocks really exists in block collection
+ // or
+ // if analysis is not required on ec files with unsuitable storage
+ // policies
+ BLOCKS_TARGET_PAIRING_SKIPPED,
+ // Represents that, All the reported blocks are satisfied the policy but
+ // some of the blocks are low redundant.
+ FEW_LOW_REDUNDANCY_BLOCKS
+ }
+
+ private Status status = null;
+ private List<Block> assignedBlocks = null;
+
+ BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
+ this.status = status;
+ this.assignedBlocks = blockMovingInfo;
+ }
+ }
+
+ public StoragePolicySatisfier(final Namesystem namesystem,
+ final BlockManager blkManager, Configuration conf, Context ctxt) {
+ this.namesystem = namesystem;
+ this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
+ this, conf.getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
+ this.blockManager = blkManager;
+ this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
+ conf.getLong(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT),
+ conf.getLong(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
+ storageMovementNeeded);
+ this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
+ this.blockMovementMaxRetry = conf.getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
+ this.ctxt = ctxt;
+ }
+
+ /**
+ * Start storage policy satisfier demon thread. Also start block storage
+ * movements monitor for retry the attempts if needed.
+ */
+ public synchronized void start(boolean reconfigStart) {
+ isRunning = true;
+ if (checkIfMoverRunning()) {
+ isRunning = false;
+ LOG.error(
+ "Stopping StoragePolicySatisfier thread " + "as Mover ID file "
+ + HdfsServerConstants.MOVER_ID_PATH.toString()
+ + " been opened. Maybe a Mover instance is running!");
+ return;
+ }
+ if (reconfigStart) {
+ LOG.info("Starting StoragePolicySatisfier, as admin requested to "
+ + "start it.");
+ } else {
+ LOG.info("Starting StoragePolicySatisfier.");
+ }
+
+ // Ensure that all the previously submitted block movements(if any) have to
+ // be stopped in all datanodes.
+ addDropSPSWorkCommandsToAllDNs();
+ storageMovementNeeded.init();
+ storagePolicySatisfierThread = new Daemon(this);
+ storagePolicySatisfierThread.setName("StoragePolicySatisfier");
+ storagePolicySatisfierThread.start();
+ this.storageMovementsMonitor.start();
+ }
+
+ /**
+ * Disables storage policy satisfier by stopping its services.
+ *
+ * @param forceStop
+ * true represents that it should stop SPS service by clearing all
+ * pending SPS work
+ */
+ public synchronized void disable(boolean forceStop) {
+ isRunning = false;
+
+ if (storagePolicySatisfierThread == null) {
+ return;
+ }
+
+ storageMovementNeeded.close();
+
+ storagePolicySatisfierThread.interrupt();
+ this.storageMovementsMonitor.stop();
+ if (forceStop) {
+ storageMovementNeeded.clearQueuesWithNotification();
+ addDropSPSWorkCommandsToAllDNs();
+ } else {
+ LOG.info("Stopping StoragePolicySatisfier.");
+ }
+ }
+
+ /**
+ * Timed wait to stop storage policy satisfier daemon threads.
+ */
+ public synchronized void stopGracefully() {
+ if (isRunning) {
+ disable(true);
+ }
+ this.storageMovementsMonitor.stopGracefully();
+
+ if (storagePolicySatisfierThread == null) {
+ return;
+ }
+ try {
+ storagePolicySatisfierThread.join(3000);
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ /**
+ * Check whether StoragePolicySatisfier is running.
+ * @return true if running
+ */
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ // Return true if a Mover instance is running
+ private boolean checkIfMoverRunning() {
+ String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
+ return namesystem.isFileOpenedForWrite(moverId);
+ }
+
+ /**
+ * Adding drop commands to all datanodes to stop performing the satisfier
+ * block movements, if any.
+ */
+ private void addDropSPSWorkCommandsToAllDNs() {
+ this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ }
+
+ @Override
+ public void run() {
+ while (namesystem.isRunning() && isRunning) {
+ try {
+ if (!namesystem.isInSafeMode()) {
+ ItemInfo itemInfo = storageMovementNeeded.get();
+ if (itemInfo != null) {
+ if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
+ LOG.info("Failed to satisfy the policy after "
+ + blockMovementMaxRetry + " retries. Removing inode "
+ + itemInfo.getTrackId() + " from the queue");
+ storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
+ continue;
+ }
+ long trackId = itemInfo.getTrackId();
+ BlockCollection blockCollection;
+ BlocksMovingAnalysis status = null;
+ try {
+ namesystem.readLock();
+ blockCollection = namesystem.getBlockCollection(trackId);
+ // Check blockCollectionId existence.
+ if (blockCollection == null) {
+ // File doesn't exists (maybe got deleted), remove trackId from
+ // the queue
+ storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
+ } else {
+ status =
+ analyseBlocksStorageMovementsAndAssignToDN(
+ blockCollection);
+ }
+ } finally {
+ namesystem.readUnlock();
+ }
+ if (blockCollection != null) {
+ switch (status.status) {
+ // Just add to monitor, so it will be retried after timeout
+ case ANALYSIS_SKIPPED_FOR_RETRY:
+ // Just add to monitor, so it will be tracked for report and
+ // be removed on storage movement attempt finished report.
+ case BLOCKS_TARGETS_PAIRED:
+ this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
+ .getStartId(), itemInfo.getTrackId(), monotonicNow(),
+ status.assignedBlocks, itemInfo.getRetryCount()));
+ break;
+ case NO_BLOCKS_TARGETS_PAIRED:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding trackID " + trackId
+ + " back to retry queue as none of the blocks"
+ + " found its eligible targets.");
+ }
+ itemInfo.retryCount++;
+ this.storageMovementNeeded.add(itemInfo);
+ break;
+ case FEW_LOW_REDUNDANCY_BLOCKS:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding trackID " + trackId
+ + " back to retry queue as some of the blocks"
+ + " are low redundant.");
+ }
+ this.storageMovementNeeded.add(itemInfo);
+ break;
+ // Just clean Xattrs
+ case BLOCKS_TARGET_PAIRING_SKIPPED:
+ case BLOCKS_ALREADY_SATISFIED:
+ default:
+ LOG.info("Block analysis skipped or blocks already satisfied"
+ + " with storages. So, Cleaning up the Xattrs.");
+ storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
+ break;
+ }
+ }
+ }
+ }
+ int numLiveDn = ctxt.getNumLiveDataNodes();
+ if (storageMovementNeeded.size() == 0
+ || blockCount > (numLiveDn * spsWorkMultiplier)) {
+ Thread.sleep(3000);
+ blockCount = 0L;
+ }
+ } catch (Throwable t) {
+ handleException(t);
+ }
+ }
+ }
+
+ private void handleException(Throwable t) {
+ // double check to avoid entering into synchronized block.
+ if (isRunning) {
+ synchronized (this) {
+ if (isRunning) {
+ isRunning = false;
+ // Stopping monitor thread and clearing queues as well
+ this.clearQueues();
+ this.storageMovementsMonitor.stopGracefully();
+ if (!namesystem.isRunning()) {
+ LOG.info("Stopping StoragePolicySatisfier.");
+ if (!(t instanceof InterruptedException)) {
+ LOG.info("StoragePolicySatisfier received an exception"
+ + " while shutting down.", t);
+ }
+ return;
+ }
+ }
+ }
+ }
+ LOG.error("StoragePolicySatisfier thread received runtime exception. "
+ + "Stopping Storage policy satisfier work", t);
+ return;
+ }
+
+ private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
+ BlockCollection blockCollection) {
+ BlocksMovingAnalysis.Status status =
+ BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
+ byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
+ BlockStoragePolicy existingStoragePolicy =
+ blockManager.getStoragePolicy(existingStoragePolicyID);
+ if (!blockCollection.getLastBlock().isComplete()) {
+ // Postpone, currently file is under construction
+ // So, should we add back? or leave it to user
+ LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ + " this to the next retry iteration", blockCollection.getId());
+ return new BlocksMovingAnalysis(
+ BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
+ new ArrayList<>());
+ }
+
+ BlockInfo[] blocks = blockCollection.getBlocks();
+ if (blocks.length == 0) {
+ LOG.info("BlockCollectionID: {} file is not having any blocks."
+ + " So, skipping the analysis.", blockCollection.getId());
+ return new BlocksMovingAnalysis(
+ BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
+ new ArrayList<>());
+ }
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
+
+ for (int i = 0; i < blocks.length; i++) {
+ BlockInfo blockInfo = blocks[i];
+ List<StorageType> expectedStorageTypes;
+ if (blockInfo.isStriped()) {
+ if (ErasureCodingPolicyManager
+ .checkStoragePolicySuitableForECStripedMode(
+ existingStoragePolicyID)) {
+ expectedStorageTypes = existingStoragePolicy
+ .chooseStorageTypes((short) blockInfo.getCapacity());
+ } else {
+ // Currently we support only limited policies (HOT, COLD, ALLSSD)
+ // for EC striped mode files. SPS will ignore to move the blocks if
+ // the storage policy is not in EC Striped mode supported policies
+ LOG.warn("The storage policy " + existingStoragePolicy.getName()
+ + " is not suitable for Striped EC files. "
+ + "So, ignoring to move the blocks");
+ return new BlocksMovingAnalysis(
+ BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
+ new ArrayList<>());
+ }
+ } else {
+ expectedStorageTypes = existingStoragePolicy
+ .chooseStorageTypes(blockInfo.getReplication());
+ }
+
+ DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
+ StorageType[] storageTypes = new StorageType[storages.length];
+ for (int j = 0; j < storages.length; j++) {
+ DatanodeStorageInfo datanodeStorageInfo = storages[j];
+ StorageType storageType = datanodeStorageInfo.getStorageType();
+ storageTypes[j] = storageType;
+ }
+ List<StorageType> existing =
+ new LinkedList<StorageType>(Arrays.asList(storageTypes));
+ if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+ existing, true)) {
+ boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
+ blockInfo, expectedStorageTypes, existing, storages);
+ if (blocksPaired) {
+ status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
+ } else {
+ // none of the blocks found its eligible targets for satisfying the
+ // storage policy.
+ status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
+ }
+ } else {
+ if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
+ status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
+ }
+ }
+ }
+
+ List<Block> assignedBlockIds = new ArrayList<Block>();
+ for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+ // Check for at least one block storage movement has been chosen
+ if (blkMovingInfo.getTarget() != null) {
+ // assign block storage movement task to the target node
+ ((DatanodeDescriptor) blkMovingInfo.getTarget())
+ .addBlocksToMoveStorage(blkMovingInfo);
+ LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
+ assignedBlockIds.add(blkMovingInfo.getBlock());
+ blockCount++;
+ }
+ }
+ return new BlocksMovingAnalysis(status, assignedBlockIds);
+ }
+
+ /**
+ * Compute the list of block moving information corresponding to the given
+ * blockId. This will check that each block location of the given block is
+ * satisfying the expected storage policy. If block location is not satisfied
+ * the policy then find out the target node with the expected storage type to
+ * satisfy the storage policy.
+ *
+ * @param blockMovingInfos
+ * - list of block source and target node pair
+ * @param blockInfo
+ * - block details
+ * @param expectedStorageTypes
+ * - list of expected storage type to satisfy the storage policy
+ * @param existing
+ * - list to get existing storage types
+ * @param storages
+ * - available storages
+ * @return false if some of the block locations failed to find target node to
+ * satisfy the storage policy, true otherwise
+ */
+ private boolean computeBlockMovingInfos(
+ List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+ List<StorageType> expectedStorageTypes, List<StorageType> existing,
+ DatanodeStorageInfo[] storages) {
+ boolean foundMatchingTargetNodesForBlock = true;
+ if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+ existing, true)) {
+ List<StorageTypeNodePair> sourceWithStorageMap =
+ new ArrayList<StorageTypeNodePair>();
+ List<DatanodeStorageInfo> existingBlockStorages =
+ new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+ // if expected type exists in source node already, local movement would be
+ // possible, so lets find such sources first.
+ Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+ if (checkSourceAndTargetTypeExists(
+ datanodeStorageInfo.getDatanodeDescriptor(), existing,
+ expectedStorageTypes)) {
+ sourceWithStorageMap
+ .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
+ datanodeStorageInfo.getDatanodeDescriptor()));
+ iterator.remove();
+ existing.remove(datanodeStorageInfo.getStorageType());
+ }
+ }
+
+ // Let's find sources for existing types left.
+ for (StorageType existingType : existing) {
+ iterator = existingBlockStorages.iterator();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+ StorageType storageType = datanodeStorageInfo.getStorageType();
+ if (storageType == existingType) {
+ iterator.remove();
+ sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
+ datanodeStorageInfo.getDatanodeDescriptor()));
+ break;
+ }
+ }
+ }
+
+ StorageTypeNodeMap locsForExpectedStorageTypes =
+ findTargetsForExpectedStorageTypes(expectedStorageTypes);
+
+ foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
+ blockMovingInfos, blockInfo, sourceWithStorageMap,
+ expectedStorageTypes, locsForExpectedStorageTypes);
+ }
+ return foundMatchingTargetNodesForBlock;
+ }
+
+ /**
+ * Find the good target node for each source node for which block storages was
+ * misplaced.
+ *
+ * @param blockMovingInfos
+ * - list of block source and target node pair
+ * @param blockInfo
+ * - Block
+ * @param sourceWithStorageList
+ * - Source Datanode with storages list
+ * @param expected
+ * - Expecting storages to move
+ * @param locsForExpectedStorageTypes
+ * - Available DNs for expected storage types
+ * @return false if some of the block locations failed to find target node to
+ * satisfy the storage policy
+ */
+ private boolean findSourceAndTargetToMove(
+ List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+ List<StorageTypeNodePair> sourceWithStorageList,
+ List<StorageType> expected,
+ StorageTypeNodeMap locsForExpectedStorageTypes) {
+ boolean foundMatchingTargetNodesForBlock = true;
+ List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
+
+ // Looping over all the source node locations and choose the target
+ // storage within same node if possible. This is done separately to
+ // avoid choosing a target which already has this block.
+ for (int i = 0; i < sourceWithStorageList.size(); i++) {
+ StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
+
+ // Check whether the block replica is already placed in the expected
+ // storage type in this source datanode.
+ if (!expected.contains(existingTypeNodePair.storageType)) {
+ StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
+ blockInfo, existingTypeNodePair.dn, expected);
+ if (chosenTarget != null) {
+ if (blockInfo.isStriped()) {
+ buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+ existingTypeNodePair.storageType, chosenTarget.dn,
+ chosenTarget.storageType, blockMovingInfos);
+ } else {
+ buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+ existingTypeNodePair.storageType, chosenTarget.dn,
+ chosenTarget.storageType, blockMovingInfos);
+ }
+ expected.remove(chosenTarget.storageType);
+ // TODO: We can increment scheduled block count for this node?
+ }
+ }
+ // To avoid choosing this excludeNodes as targets later
+ excludeNodes.add(existingTypeNodePair.dn);
+ }
+
+ // Looping over all the source node locations. Choose a remote target
+ // storage node if it was not found out within same node.
+ for (int i = 0; i < sourceWithStorageList.size(); i++) {
+ StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
+ StorageTypeNodePair chosenTarget = null;
+ // Chosen the target storage within same datanode. So just skipping this
+ // source node.
+ if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
+ continue;
+ }
+ if (chosenTarget == null && blockManager.getDatanodeManager()
+ .getNetworkTopology().isNodeGroupAware()) {
+ chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
+ expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
+ excludeNodes);
+ }
+
+ // Then, match nodes on the same rack
+ if (chosenTarget == null) {
+ chosenTarget =
+ chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
+ Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes);
+ }
+
+ if (chosenTarget == null) {
+ chosenTarget =
+ chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
+ Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
+ }
+ if (null != chosenTarget) {
+ if (blockInfo.isStriped()) {
+ buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+ existingTypeNodePair.storageType, chosenTarget.dn,
+ chosenTarget.storageType, blockMovingInfos);
+ } else {
+ buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+ existingTypeNodePair.storageType, chosenTarget.dn,
+ chosenTarget.storageType, blockMovingInfos);
+ }
+
+ expected.remove(chosenTarget.storageType);
+ excludeNodes.add(chosenTarget.dn);
+ // TODO: We can increment scheduled block count for this node?
+ } else {
+ LOG.warn(
+ "Failed to choose target datanode for the required"
+ + " storage types {}, block:{}, existing storage type:{}",
+ expected, blockInfo, existingTypeNodePair.storageType);
+ }
+ }
+
+ if (expected.size() > 0) {
+ foundMatchingTargetNodesForBlock = false;
+ }
+
+ return foundMatchingTargetNodesForBlock;
+ }
+
+ private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
+ DatanodeDescriptor dn) {
+ for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
+ if (blockMovingInfo.getSource().equals(dn)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
+ DatanodeInfo sourceNode, StorageType sourceStorageType,
+ DatanodeInfo targetNode, StorageType targetStorageType,
+ List<BlockMovingInfo> blkMovingInfos) {
+ Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
+ blockInfo.getGenerationStamp());
+ BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
+ targetNode, sourceStorageType, targetStorageType);
+ blkMovingInfos.add(blkMovingInfo);
+ }
+
+ private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
+ DatanodeInfo sourceNode, StorageType sourceStorageType,
+ DatanodeInfo targetNode, StorageType targetStorageType,
+ List<BlockMovingInfo> blkMovingInfos) {
+ // For a striped block, it needs to construct internal block at the given
+ // index of a block group. Here it is iterating over all the block indices
+ // and construct internal blocks which can be then considered for block
+ // movement.
+ BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
+ for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
+ if (si.getBlockIndex() >= 0) {
+ DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
+ if (sourceNode.equals(dn)) {
+ // construct internal block
+ long blockId = blockInfo.getBlockId() + si.getBlockIndex();
+ long numBytes = StripedBlockUtil.getInternalBlockLength(
+ sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
+ sBlockInfo.getDataBlockNum(), si.getBlockIndex());
+ Block blk = new Block(blockId, numBytes,
+ blockInfo.getGenerationStamp());
+ BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
+ targetNode, sourceStorageType, targetStorageType);
+ blkMovingInfos.add(blkMovingInfo);
+ }
+ }
+ }
+ }
+
+ /**
+ * Choose the target storage within same datanode if possible.
+ *
+ * @param block
+ * - block info
+ * @param source
+ * - source datanode
+ * @param targetTypes
+ * - list of target storage types
+ */
+ private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
+ DatanodeDescriptor source, List<StorageType> targetTypes) {
+ for (StorageType t : targetTypes) {
+ DatanodeStorageInfo chooseStorage4Block =
+ source.chooseStorage4Block(t, block.getNumBytes());
+ if (chooseStorage4Block != null) {
+ return new StorageTypeNodePair(t, source);
+ }
+ }
+ return null;
+ }
+
+ private StorageTypeNodePair chooseTarget(Block block,
+ DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
+ StorageTypeNodeMap locsForExpectedStorageTypes,
+ List<DatanodeDescriptor> excludeNodes) {
+ for (StorageType t : targetTypes) {
+ List<DatanodeDescriptor> nodesWithStorages =
+ locsForExpectedStorageTypes.getNodesWithStorages(t);
+ if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
+ continue; // no target nodes with the required storage type.
+ }
+ Collections.shuffle(nodesWithStorages);
+ for (DatanodeDescriptor target : nodesWithStorages) {
+ if (!excludeNodes.contains(target) && matcher.match(
+ blockManager.getDatanodeManager().getNetworkTopology(), source,
+ target)) {
+ if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
+ return new StorageTypeNodePair(t, target);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private static class StorageTypeNodePair {
+ private StorageType storageType = null;
+ private DatanodeDescriptor dn = null;
+
+ StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
+ this.storageType = storageType;
+ this.dn = dn;
+ }
+ }
+
+ private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
+ List<StorageType> expected) {
+ StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
+ List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
+ .getDatanodeListForReport(DatanodeReportType.LIVE);
+ for (DatanodeDescriptor dn : reports) {
+ StorageReport[] storageReports = dn.getStorageReports();
+ for (StorageReport storageReport : storageReports) {
+ StorageType t = storageReport.getStorage().getStorageType();
+ if (expected.contains(t)) {
+ final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
+ if (maxRemaining > 0L) {
+ targetMap.add(t, dn);
+ }
+ }
+ }
+ }
+ return targetMap;
+ }
+
+ private static long getMaxRemaining(StorageReport[] storageReports,
+ StorageType t) {
+ long max = 0L;
+ for (StorageReport r : storageReports) {
+ if (r.getStorage().getStorageType() == t) {
+ if (r.getRemaining() > max) {
+ max = r.getRemaining();
+ }
+ }
+ }
+ return max;
+ }
+
+ private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
+ List<StorageType> existing, List<StorageType> expectedStorageTypes) {
+ DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
+ boolean isExpectedTypeAvailable = false;
+ boolean isExistingTypeAvailable = false;
+ for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
+ StorageType storageType = dnInfo.getStorageType();
+ if (existing.contains(storageType)) {
+ isExistingTypeAvailable = true;
+ }
+ if (expectedStorageTypes.contains(storageType)) {
+ isExpectedTypeAvailable = true;
+ }
+ }
+ return isExistingTypeAvailable && isExpectedTypeAvailable;
+ }
+
+ private static class StorageTypeNodeMap {
+ private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
+ new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
+
+ private void add(StorageType t, DatanodeDescriptor dn) {
+ List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
+ LinkedList<DatanodeDescriptor> value = null;
+ if (nodesWithStorages == null) {
+ value = new LinkedList<DatanodeDescriptor>();
+ value.add(dn);
+ typeNodeMap.put(t, value);
+ } else {
+ nodesWithStorages.add(dn);
+ }
+ }
+
+ /**
+ * @param type
+ * - Storage type
+ * @return datanodes which has the given storage type
+ */
+ private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
+ return typeNodeMap.get(type);
+ }
+ }
+
+ /**
+ * Receives set of storage movement attempt finished blocks report.
+ *
+ * @param moveAttemptFinishedBlks
+ * set of storage movement attempt finished blocks.
+ */
+ public void handleStorageMovementAttemptFinishedBlks(
+ BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
+ if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
+ return;
+ }
+ storageMovementsMonitor
+ .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
+ }
+
+ @VisibleForTesting
+ BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+ return storageMovementsMonitor;
+ }
+
+ /**
+ * Clear the queues from to be storage movement needed lists and items tracked
+ * in storage movement monitor.
+ */
+ public void clearQueues() {
+ LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, "
+ + "user requests on satisfying block storages would be discarded.");
+ storageMovementNeeded.clearAll();
+ }
+
+ /**
+ * Set file inode in queue for which storage movement needed for its blocks.
+ *
+ * @param inodeId
+ * - file inode/blockcollection id.
+ */
+ public void satisfyStoragePolicy(Long inodeId) {
+ //For file startId and trackId is same
+ storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added track info for inode {} to block "
+ + "storageMovementNeeded queue", inodeId);
+ }
+ }
+
+ public void addInodeToPendingDirQueue(long id) {
+ storageMovementNeeded.addToPendingDirQueue(id);
+ }
+
+ /**
+ * Clear queues for given track id.
+ */
+ public void clearQueue(long trackId) {
+ storageMovementNeeded.clearQueue(trackId);
+ }
+
+ /**
+ * ItemInfo is a file info object for which need to satisfy the
+ * policy.
+ */
+ public static class ItemInfo {
+ private long startId;
+ private long trackId;
+ private int retryCount;
+
+ public ItemInfo(long startId, long trackId) {
+ this.startId = startId;
+ this.trackId = trackId;
+ //set 0 when item is getting added first time in queue.
+ this.retryCount = 0;
+ }
+
+ public ItemInfo(long startId, long trackId, int retryCount) {
+ this.startId = startId;
+ this.trackId = trackId;
+ this.retryCount = retryCount;
+ }
+
+ /**
+ * Return the start inode id of the current track Id.
+ */
+ public long getStartId() {
+ return startId;
+ }
+
+ /**
+ * Return the File inode Id for which needs to satisfy the policy.
+ */
+ public long getTrackId() {
+ return trackId;
+ }
+
+ /**
+ * Returns true if the tracking path is a directory, false otherwise.
+ */
+ public boolean isDir() {
+ return (startId != trackId);
+ }
+
+ /**
+ * Get the attempted retry count of the block for satisfy the policy.
+ */
+ public int getRetryCount() {
+ return retryCount;
+ }
+ }
+
+ /**
+ * This class contains information of an attempted blocks and its last
+ * attempted or reported time stamp. This is used by
+ * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
+ */
+ final static class AttemptedItemInfo extends ItemInfo {
+ private long lastAttemptedOrReportedTime;
+ private final List<Block> blocks;
+
+ /**
+ * AttemptedItemInfo constructor.
+ *
+ * @param rootId
+ * rootId for trackId
+ * @param trackId
+ * trackId for file.
+ * @param lastAttemptedOrReportedTime
+ * last attempted or reported time
+ */
+ AttemptedItemInfo(long rootId, long trackId,
+ long lastAttemptedOrReportedTime,
+ List<Block> blocks, int retryCount) {
+ super(rootId, trackId, retryCount);
+ this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
+ this.blocks = blocks;
+ }
+
+ /**
+ * @return last attempted or reported time stamp.
+ */
+ long getLastAttemptedOrReportedTime() {
+ return lastAttemptedOrReportedTime;
+ }
+
+ /**
+ * Update lastAttemptedOrReportedTime, so that the expiration time will be
+ * postponed to future.
+ */
+ void touchLastReportedTimeStamp() {
+ this.lastAttemptedOrReportedTime = monotonicNow();
+ }
+
+ List<Block> getBlocks() {
+ return this.blocks;
+ }
+
+ }
+
+ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
+ String path) throws IOException {
+ INode inode = namesystem.getFSDirectory().getINode(path);
+ return storageMovementNeeded.getStatus(inode.getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java
new file mode 100644
index 0000000..d1d69fb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides a mechanism for satisfying the storage policy of a
+ * path.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
deleted file mode 100644
index d4ccb3e..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
-
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-/**
- * Tests that block storage movement attempt failures are reported from DN and
- * processed them correctly or not.
- */
-public class TestBlockStorageMovementAttemptedItems {
-
- private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
- private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
- private final int selfRetryTimeout = 500;
-
- @Before
- public void setup() throws Exception {
- unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
- Mockito.mock(Namesystem.class),
- Mockito.mock(StoragePolicySatisfier.class), 100);
- bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
- selfRetryTimeout, unsatisfiedStorageMovementFiles);
- }
-
- @After
- public void teardown() {
- if (bsmAttemptedItems != null) {
- bsmAttemptedItems.stop();
- bsmAttemptedItems.stopGracefully();
- }
- }
-
- private boolean checkItemMovedForRetry(Long item, long retryTimeout)
- throws InterruptedException {
- long stopTime = monotonicNow() + (retryTimeout * 2);
- boolean isItemFound = false;
- while (monotonicNow() < (stopTime)) {
- ItemInfo ele = null;
- while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
- if (item == ele.getTrackId()) {
- isItemFound = true;
- break;
- }
- }
- if (!isItemFound) {
- Thread.sleep(100);
- } else {
- break;
- }
- }
- return isItemFound;
- }
-
- /**
- * Verify that moved blocks reporting should queued up the block info.
- */
- @Test(timeout = 30000)
- public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
- bsmAttemptedItems.start(); // start block movement result monitor thread
- Long item = new Long(1234);
- List<Block> blocks = new ArrayList<Block>();
- blocks.add(new Block(item));
- bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
- Block[] blockArray = new Block[blocks.size()];
- blocks.toArray(blockArray);
- bsmAttemptedItems.addReportedMovedBlocks(blockArray);
- assertEquals("Failed to receive result!", 1,
- bsmAttemptedItems.getMovementFinishedBlocksCount());
- }
-
- /**
- * Verify empty moved blocks reporting queue.
- */
- @Test(timeout = 30000)
- public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
- bsmAttemptedItems.start(); // start block movement report monitor thread
- Long item = new Long(1234);
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
- assertEquals("Shouldn't receive result", 0,
- bsmAttemptedItems.getMovementFinishedBlocksCount());
- assertEquals("Item doesn't exist in the attempted list", 1,
- bsmAttemptedItems.getAttemptedItemsCount());
- }
-
- /**
- * Partial block movement with
- * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence
- * is #blockStorageMovementReportedItemsCheck() and then
- * #blocksStorageMovementUnReportedItemsCheck().
- */
- @Test(timeout = 30000)
- public void testPartialBlockMovementShouldBeRetried1() throws Exception {
- Long item = new Long(1234);
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- blocks.add(new Block(5678L));
- Long trackID = 0L;
- bsmAttemptedItems
- .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
- Block[] blksMovementReport = new Block[1];
- blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
-
- // start block movement report monitor thread
- bsmAttemptedItems.start();
- assertTrue("Failed to add to the retry list",
- checkItemMovedForRetry(trackID, 5000));
- assertEquals("Failed to remove from the attempted list", 0,
- bsmAttemptedItems.getAttemptedItemsCount());
- }
-
- /**
- * Partial block movement. Here, first occurrence is
- * #blocksStorageMovementUnReportedItemsCheck() and then
- * #blockStorageMovementReportedItemsCheck().
- */
- @Test(timeout = 30000)
- public void testPartialBlockMovementShouldBeRetried2() throws Exception {
- Long item = new Long(1234);
- Long trackID = 0L;
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- bsmAttemptedItems
- .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
- Block[] blksMovementReport = new Block[1];
- blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
-
- Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
-
- bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
- bsmAttemptedItems.blockStorageMovementReportedItemsCheck();
-
- assertTrue("Failed to add to the retry list",
- checkItemMovedForRetry(trackID, 5000));
- assertEquals("Failed to remove from the attempted list", 0,
- bsmAttemptedItems.getAttemptedItemsCount());
- }
-
- /**
- * Partial block movement with only BlocksStorageMoveAttemptFinished report
- * and storageMovementAttemptedItems list is empty.
- */
- @Test(timeout = 30000)
- public void testPartialBlockMovementWithEmptyAttemptedQueue()
- throws Exception {
- Long item = new Long(1234);
- Long trackID = 0L;
- List<Block> blocks = new ArrayList<>();
- blocks.add(new Block(item));
- bsmAttemptedItems
- .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
- Block[] blksMovementReport = new Block[1];
- blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
- assertFalse(
- "Should not add in queue again if it is not there in"
- + " storageMovementAttemptedItems",
- checkItemMovedForRetry(trackID, 5000));
- assertEquals("Failed to remove from the attempted list", 1,
- bsmAttemptedItems.getAttemptedItemsCount());
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org