You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/07/05 04:17:24 UTC
[31/50] [abbrv] hadoop git commit: HDFS-12955: [SPS]: Move SPS
classes to a separate package. Contributed by Rakesh R.
HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dccccbac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dccccbac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dccccbac
Branch: refs/heads/HDFS-10285
Commit: dccccbac4a8ad82f9290c8430b95e34026942b53
Parents: 82e10f5
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Fri Dec 22 09:10:12 2017 -0800
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Jul 5 08:46:33 2018 +0530
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 6 +-
.../BlockStorageMovementAttemptedItems.java | 241 ---
.../namenode/BlockStorageMovementNeeded.java | 574 ------
.../hdfs/server/namenode/FSNamesystem.java | 1 +
.../hdfs/server/namenode/IntraNNSPSContext.java | 41 +
.../server/namenode/StoragePolicySatisfier.java | 973 ----------
.../sps/BlockStorageMovementAttemptedItems.java | 241 +++
.../sps/BlockStorageMovementNeeded.java | 572 ++++++
.../namenode/sps/StoragePolicySatisfier.java | 988 ++++++++++
.../hdfs/server/namenode/sps/package-info.java | 28 +
.../TestBlockStorageMovementAttemptedItems.java | 196 --
.../namenode/TestStoragePolicySatisfier.java | 1775 -----------------
...stStoragePolicySatisfierWithStripedFile.java | 580 ------
.../TestBlockStorageMovementAttemptedItems.java | 196 ++
.../sps/TestStoragePolicySatisfier.java | 1779 ++++++++++++++++++
...stStoragePolicySatisfierWithStripedFile.java | 580 ++++++
16 files changed, 4430 insertions(+), 4341 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index c81ed6c..1cf687e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,11 +89,12 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
+import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -478,7 +479,8 @@ public class BlockManager implements BlockStatsMXBean {
conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
- sps = new StoragePolicySatisfier(namesystem, this, conf);
+ StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem);
+ sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
deleted file mode 100644
index 643255f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.util.Daemon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A monitor class for checking whether block storage movements attempt
- * completed or not. If this receives block storage movement attempt
- * status(either success or failure) from DN then it will just remove the
- * entries from tracking. If there is no DN reports about movement attempt
- * finished for a longer time period, then such items will retries automatically
- * after timeout. The default timeout would be 5 minutes.
- */
-public class BlockStorageMovementAttemptedItems {
- private static final Logger LOG =
- LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
-
- /**
- * A map holds the items which are already taken for blocks movements
- * processing and sent to DNs.
- */
- private final List<AttemptedItemInfo> storageMovementAttemptedItems;
- private final List<Block> movementFinishedBlocks;
- private volatile boolean monitorRunning = true;
- private Daemon timerThread = null;
- //
- // It might take anywhere between 5 to 10 minutes before
- // a request is timed out.
- //
- private long selfRetryTimeout = 5 * 60 * 1000;
-
- //
- // It might take anywhere between 1 to 2 minutes before
- // a request is timed out.
- //
- private long minCheckTimeout = 1 * 60 * 1000; // minimum value
- private BlockStorageMovementNeeded blockStorageMovementNeeded;
-
- public BlockStorageMovementAttemptedItems(long recheckTimeout,
- long selfRetryTimeout,
- BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
- if (recheckTimeout > 0) {
- this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
- }
-
- this.selfRetryTimeout = selfRetryTimeout;
- this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
- storageMovementAttemptedItems = new ArrayList<>();
- movementFinishedBlocks = new ArrayList<>();
- }
-
- /**
- * Add item to block storage movement attempted items map which holds the
- * tracking/blockCollection id versus time stamp.
- *
- * @param itemInfo
- * - tracking info
- */
- public void add(AttemptedItemInfo itemInfo) {
- synchronized (storageMovementAttemptedItems) {
- storageMovementAttemptedItems.add(itemInfo);
- }
- }
-
- /**
- * Add the storage movement attempt finished blocks to
- * storageMovementFinishedBlocks.
- *
- * @param moveAttemptFinishedBlks
- * storage movement attempt finished blocks
- */
- public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
- if (moveAttemptFinishedBlks.length == 0) {
- return;
- }
- synchronized (movementFinishedBlocks) {
- movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
- }
- }
-
- /**
- * Starts the monitor thread.
- */
- public synchronized void start() {
- monitorRunning = true;
- timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
- timerThread.setName("BlocksStorageMovementAttemptMonitor");
- timerThread.start();
- }
-
- /**
- * Sets running flag to false. Also, this will interrupt monitor thread and
- * clear all the queued up tasks.
- */
- public synchronized void stop() {
- monitorRunning = false;
- if (timerThread != null) {
- timerThread.interrupt();
- }
- this.clearQueues();
- }
-
- /**
- * Timed wait to stop monitor thread.
- */
- synchronized void stopGracefully() {
- if (timerThread == null) {
- return;
- }
- if (monitorRunning) {
- stop();
- }
- try {
- timerThread.join(3000);
- } catch (InterruptedException ie) {
- }
- }
-
- /**
- * A monitor class for checking block storage movement attempt status and long
- * waiting items periodically.
- */
- private class BlocksStorageMovementAttemptMonitor implements Runnable {
- @Override
- public void run() {
- while (monitorRunning) {
- try {
- blockStorageMovementReportedItemsCheck();
- blocksStorageMovementUnReportedItemsCheck();
- Thread.sleep(minCheckTimeout);
- } catch (InterruptedException ie) {
- LOG.info("BlocksStorageMovementAttemptMonitor thread "
- + "is interrupted.", ie);
- } catch (IOException ie) {
- LOG.warn("BlocksStorageMovementAttemptMonitor thread "
- + "received exception and exiting.", ie);
- }
- }
- }
- }
-
- @VisibleForTesting
- void blocksStorageMovementUnReportedItemsCheck() {
- synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
- .iterator();
- long now = monotonicNow();
- while (iter.hasNext()) {
- AttemptedItemInfo itemInfo = iter.next();
- if (now > itemInfo.getLastAttemptedOrReportedTime()
- + selfRetryTimeout) {
- Long blockCollectionID = itemInfo.getTrackId();
- synchronized (movementFinishedBlocks) {
- ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
- blockCollectionID, itemInfo.getRetryCount() + 1);
- blockStorageMovementNeeded.add(candidate);
- iter.remove();
- LOG.info("TrackID: {} becomes timed out and moved to needed "
- + "retries queue for next iteration.", blockCollectionID);
- }
- }
- }
-
- }
- }
-
- @VisibleForTesting
- void blockStorageMovementReportedItemsCheck() throws IOException {
- synchronized (movementFinishedBlocks) {
- Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
- while (finishedBlksIter.hasNext()) {
- Block blk = finishedBlksIter.next();
- synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
- .iterator();
- while (iterator.hasNext()) {
- AttemptedItemInfo attemptedItemInfo = iterator.next();
- attemptedItemInfo.getBlocks().remove(blk);
- if (attemptedItemInfo.getBlocks().isEmpty()) {
- // TODO: try add this at front of the Queue, so that this element
- // gets the chance first and can be cleaned from queue quickly as
- // all movements already done.
- blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
- .getStartId(), attemptedItemInfo.getTrackId(),
- attemptedItemInfo.getRetryCount() + 1));
- iterator.remove();
- }
- }
- }
- // Remove attempted blocks from movementFinishedBlocks list.
- finishedBlksIter.remove();
- }
- }
- }
-
- @VisibleForTesting
- public int getMovementFinishedBlocksCount() {
- return movementFinishedBlocks.size();
- }
-
- @VisibleForTesting
- public int getAttemptedItemsCount() {
- return storageMovementAttemptedItems.size();
- }
-
- public void clearQueues() {
- movementFinishedBlocks.clear();
- storageMovementAttemptedItems.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
deleted file mode 100644
index 89bcbff..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ /dev/null
@@ -1,574 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
-import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A Class to track the block collection IDs (Inode's ID) for which physical
- * storage movement needed as per the Namespace and StorageReports from DN.
- * It scan the pending directories for which storage movement is required and
- * schedule the block collection IDs for movement. It track the info of
- * scheduled items and remove the SPS xAttr from the file/Directory once
- * movement is success.
- */
-@InterfaceAudience.Private
-public class BlockStorageMovementNeeded {
-
- public static final Logger LOG =
- LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
-
- private final Queue<ItemInfo> storageMovementNeeded =
- new LinkedList<ItemInfo>();
-
- /**
- * Map of startId and number of child's. Number of child's indicate the
- * number of files pending to satisfy the policy.
- */
- private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
- new HashMap<Long, DirPendingWorkInfo>();
-
- private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
- new ConcurrentHashMap<>();
-
- private final Namesystem namesystem;
-
- // List of pending dir to satisfy the policy
- private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
-
- private final StoragePolicySatisfier sps;
-
- private Daemon inodeIdCollector;
-
- private final int maxQueuedItem;
-
- // Amount of time to cache the SUCCESS status of path before turning it to
- // NOT_AVAILABLE.
- private static long statusClearanceElapsedTimeMs = 300000;
-
- public BlockStorageMovementNeeded(Namesystem namesystem,
- StoragePolicySatisfier sps, int queueLimit) {
- this.namesystem = namesystem;
- this.sps = sps;
- this.maxQueuedItem = queueLimit;
- }
-
- /**
- * Add the candidate to tracking list for which storage movement
- * expected if necessary.
- *
- * @param trackInfo
- * - track info for satisfy the policy
- */
- public synchronized void add(ItemInfo trackInfo) {
- spsStatus.put(trackInfo.getStartId(),
- new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.IN_PROGRESS));
- storageMovementNeeded.add(trackInfo);
- }
-
- /**
- * Add the itemInfo to tracking list for which storage movement
- * expected if necessary.
- * @param startId
- * - start id
- * @param itemInfoList
- * - List of child in the directory
- */
- @VisibleForTesting
- public synchronized void addAll(long startId,
- List<ItemInfo> itemInfoList, boolean scanCompleted) {
- storageMovementNeeded.addAll(itemInfoList);
- DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
- if (pendingWork == null) {
- pendingWork = new DirPendingWorkInfo();
- pendingWorkForDirectory.put(startId, pendingWork);
- }
- pendingWork.addPendingWorkCount(itemInfoList.size());
- if (scanCompleted) {
- pendingWork.markScanCompleted();
- }
- }
-
- /**
- * Gets the block collection id for which storage movements check necessary
- * and make the movement if required.
- *
- * @return block collection ID
- */
- public synchronized ItemInfo get() {
- return storageMovementNeeded.poll();
- }
-
- public synchronized void addToPendingDirQueue(long id) {
- spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.PENDING));
- spsDirsToBeTraveresed.add(id);
- // Notify waiting FileInodeIdCollector thread about the newly
- // added SPS path.
- synchronized (spsDirsToBeTraveresed) {
- spsDirsToBeTraveresed.notify();
- }
- }
-
- /**
- * Returns queue remaining capacity.
- */
- public synchronized int remainingCapacity() {
- int size = storageMovementNeeded.size();
- if (size >= maxQueuedItem) {
- return 0;
- } else {
- return (maxQueuedItem - size);
- }
- }
-
- /**
- * Returns queue size.
- */
- public synchronized int size() {
- return storageMovementNeeded.size();
- }
-
- public synchronized void clearAll() {
- spsDirsToBeTraveresed.clear();
- storageMovementNeeded.clear();
- pendingWorkForDirectory.clear();
- }
-
- /**
- * Decrease the pending child count for directory once one file blocks moved
- * successfully. Remove the SPS xAttr if pending child count is zero.
- */
- public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
- boolean isSuccess) throws IOException {
- if (trackInfo.isDir()) {
- // If track is part of some start inode then reduce the pending
- // directory work count.
- long startId = trackInfo.getStartId();
- INode inode = namesystem.getFSDirectory().getInode(startId);
- if (inode == null) {
- // directory deleted just remove it.
- this.pendingWorkForDirectory.remove(startId);
- updateStatus(startId, isSuccess);
- } else {
- DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
- if (pendingWork != null) {
- pendingWork.decrementPendingWorkCount();
- if (pendingWork.isDirWorkDone()) {
- namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
- pendingWorkForDirectory.remove(startId);
- pendingWork.setFailure(!isSuccess);
- updateStatus(startId, pendingWork.isPolicySatisfied());
- }
- pendingWork.setFailure(isSuccess);
- }
- }
- } else {
- // Remove xAttr if trackID doesn't exist in
- // storageMovementAttemptedItems or file policy satisfied.
- namesystem.removeXattr(trackInfo.getTrackId(),
- XATTR_SATISFY_STORAGE_POLICY);
- updateStatus(trackInfo.getStartId(), isSuccess);
- }
- }
-
- public synchronized void clearQueue(long trackId) {
- spsDirsToBeTraveresed.remove(trackId);
- Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
- while (iterator.hasNext()) {
- ItemInfo next = iterator.next();
- if (next.getStartId() == trackId) {
- iterator.remove();
- }
- }
- pendingWorkForDirectory.remove(trackId);
- }
-
- /**
- * Mark inode status as SUCCESS in map.
- */
- private void updateStatus(long startId, boolean isSuccess){
- StoragePolicySatisfyPathStatusInfo spsStatusInfo =
- spsStatus.get(startId);
- if (spsStatusInfo == null) {
- spsStatusInfo = new StoragePolicySatisfyPathStatusInfo();
- spsStatus.put(startId, spsStatusInfo);
- }
-
- if (isSuccess) {
- spsStatusInfo.setSuccess();
- } else {
- spsStatusInfo.setFailure();
- }
- }
-
- /**
- * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded
- * and notify to clean up required resources.
- * @throws IOException
- */
- public synchronized void clearQueuesWithNotification() {
- // Remove xAttr from directories
- Long trackId;
- while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
- try {
- // Remove xAttr for file
- namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
- } catch (IOException ie) {
- LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
- }
- }
-
- // File's directly added to storageMovementNeeded, So try to remove
- // xAttr for file
- ItemInfo itemInfo;
- while ((itemInfo = storageMovementNeeded.poll()) != null) {
- try {
- // Remove xAttr for file
- if (!itemInfo.isDir()) {
- namesystem.removeXattr(itemInfo.getTrackId(),
- XATTR_SATISFY_STORAGE_POLICY);
- }
- } catch (IOException ie) {
- LOG.warn(
- "Failed to remove SPS xattr for track id "
- + itemInfo.getTrackId(), ie);
- }
- }
- this.clearAll();
- }
-
- /**
- * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
- * ID's to process for satisfy the policy.
- */
- private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
- implements Runnable {
-
- private int remainingCapacity = 0;
-
- private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
-
- StorageMovementPendingInodeIdCollector(FSDirectory dir) {
- super(dir);
- }
-
- @Override
- public void run() {
- LOG.info("Starting FileInodeIdCollector!.");
- long lastStatusCleanTime = 0;
- while (namesystem.isRunning() && sps.isRunning()) {
- try {
- if (!namesystem.isInSafeMode()) {
- FSDirectory fsd = namesystem.getFSDirectory();
- Long startINodeId = spsDirsToBeTraveresed.poll();
- if (startINodeId == null) {
- // Waiting for SPS path
- synchronized (spsDirsToBeTraveresed) {
- spsDirsToBeTraveresed.wait(5000);
- }
- } else {
- INode startInode = fsd.getInode(startINodeId);
- if (startInode != null) {
- try {
- remainingCapacity = remainingCapacity();
- spsStatus.put(startINodeId,
- new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.IN_PROGRESS));
- readLock();
- traverseDir(startInode.asDirectory(), startINodeId,
- HdfsFileStatus.EMPTY_NAME,
- new SPSTraverseInfo(startINodeId));
- } finally {
- readUnlock();
- }
- // Mark startInode traverse is done
- addAll(startInode.getId(), currentBatch, true);
- currentBatch.clear();
-
- // check if directory was empty and no child added to queue
- DirPendingWorkInfo dirPendingWorkInfo =
- pendingWorkForDirectory.get(startInode.getId());
- if (dirPendingWorkInfo.isDirWorkDone()) {
- namesystem.removeXattr(startInode.getId(),
- XATTR_SATISFY_STORAGE_POLICY);
- pendingWorkForDirectory.remove(startInode.getId());
- updateStatus(startInode.getId(), true);
- }
- }
- }
- //Clear the SPS status if status is in SUCCESS more than 5 min.
- if (Time.monotonicNow()
- - lastStatusCleanTime > statusClearanceElapsedTimeMs) {
- lastStatusCleanTime = Time.monotonicNow();
- cleanSpsStatus();
- }
- }
- } catch (Throwable t) {
- LOG.warn("Exception while loading inodes to satisfy the policy", t);
- }
- }
- }
-
- private synchronized void cleanSpsStatus() {
- for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
- spsStatus.entrySet().iterator(); it.hasNext();) {
- Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
- if (entry.getValue().canRemove()) {
- it.remove();
- }
- }
- }
-
- @Override
- protected void checkPauseForTesting() throws InterruptedException {
- // TODO implement if needed
- }
-
- @Override
- protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
- throws IOException, InterruptedException {
- assert getFSDirectory().hasReadLock();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing {} for statisy the policy",
- inode.getFullPathName());
- }
- if (!inode.isFile()) {
- return false;
- }
- if (inode.isFile() && inode.asFile().numBlocks() != 0) {
- currentBatch.add(new ItemInfo(
- ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
- remainingCapacity--;
- }
- return true;
- }
-
- @Override
- protected boolean canSubmitCurrentBatch() {
- return remainingCapacity <= 0;
- }
-
- @Override
- protected void checkINodeReady(long startId) throws IOException {
- FSNamesystem fsn = ((FSNamesystem) namesystem);
- fsn.checkNameNodeSafeMode("NN is in safe mode,"
- + "cannot satisfy the policy.");
- // SPS work should be cancelled when NN goes to standby. Just
- // double checking for sanity.
- fsn.checkOperation(NameNode.OperationCategory.WRITE);
- }
-
- @Override
- protected void submitCurrentBatch(long startId)
- throws IOException, InterruptedException {
- // Add current child's to queue
- addAll(startId, currentBatch, false);
- currentBatch.clear();
- }
-
- @Override
- protected void throttle() throws InterruptedException {
- assert !getFSDirectory().hasReadLock();
- assert !namesystem.hasReadLock();
- if (LOG.isDebugEnabled()) {
- LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
- + " waiting for some free slots.");
- }
- remainingCapacity = remainingCapacity();
- // wait for queue to be free
- while (remainingCapacity <= 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting for storageMovementNeeded queue to be free!");
- }
- Thread.sleep(5000);
- remainingCapacity = remainingCapacity();
- }
- }
-
- @Override
- protected boolean canTraverseDir(INode inode) throws IOException {
- return true;
- }
- }
-
- /**
- * Info for directory recursive scan.
- */
- public static class DirPendingWorkInfo {
-
- private int pendingWorkCount = 0;
- private boolean fullyScanned = false;
- private boolean success = true;
-
- /**
- * Increment the pending work count for directory.
- */
- public synchronized void addPendingWorkCount(int count) {
- this.pendingWorkCount = this.pendingWorkCount + count;
- }
-
- /**
- * Decrement the pending work count for directory one track info is
- * completed.
- */
- public synchronized void decrementPendingWorkCount() {
- this.pendingWorkCount--;
- }
-
- /**
- * Return true if all the pending work is done and directory fully
- * scanned, otherwise false.
- */
- public synchronized boolean isDirWorkDone() {
- return (pendingWorkCount <= 0 && fullyScanned);
- }
-
- /**
- * Mark directory scan is completed.
- */
- public synchronized void markScanCompleted() {
- this.fullyScanned = true;
- }
-
- /**
- * Return true if all the files block movement is success, otherwise false.
- */
- public boolean isPolicySatisfied() {
- return success;
- }
-
- /**
- * Set directory SPS status failed.
- */
- public void setFailure(boolean failure) {
- this.success = this.success || failure;
- }
- }
-
- public void init() {
- inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
- namesystem.getFSDirectory()));
- inodeIdCollector.setName("FileInodeIdCollector");
- inodeIdCollector.start();
- }
-
- public void close() {
- if (inodeIdCollector != null) {
- inodeIdCollector.interrupt();
- }
- }
-
- class SPSTraverseInfo extends TraverseInfo {
- private long startId;
-
- SPSTraverseInfo(long startId) {
- this.startId = startId;
- }
-
- public long getStartId() {
- return startId;
- }
- }
-
- /**
- * Represent the file/directory block movement status.
- */
- static class StoragePolicySatisfyPathStatusInfo {
- private StoragePolicySatisfyPathStatus status =
- StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- private long lastStatusUpdateTime;
-
- StoragePolicySatisfyPathStatusInfo() {
- this.lastStatusUpdateTime = 0;
- }
-
- StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) {
- this.status = status;
- this.lastStatusUpdateTime = 0;
- }
-
- private void setSuccess() {
- this.status = StoragePolicySatisfyPathStatus.SUCCESS;
- this.lastStatusUpdateTime = Time.monotonicNow();
- }
-
- private void setFailure() {
- this.status = StoragePolicySatisfyPathStatus.FAILURE;
- this.lastStatusUpdateTime = Time.monotonicNow();
- }
-
- private StoragePolicySatisfyPathStatus getStatus() {
- return status;
- }
-
- /**
- * Return true if SUCCESS status cached more then 5 min.
- */
- private boolean canRemove() {
- return (StoragePolicySatisfyPathStatus.SUCCESS == status
- || StoragePolicySatisfyPathStatus.FAILURE == status)
- && (Time.monotonicNow()
- - lastStatusUpdateTime) > statusClearanceElapsedTimeMs;
- }
- }
-
- public StoragePolicySatisfyPathStatus getStatus(long id) {
- StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id);
- if(spsStatusInfo == null){
- return StoragePolicySatisfyPathStatus.NOT_AVAILABLE;
- }
- return spsStatusInfo.getStatus();
- }
-
- @VisibleForTesting
- public static void setStatusClearanceElapsedTimeMs(
- long statusClearanceElapsedTimeMs) {
- BlockStorageMovementNeeded.statusClearanceElapsedTimeMs =
- statusClearanceElapsedTimeMs;
- }
-
- @VisibleForTesting
- public static long getStatusClearanceElapsedTimeMs() {
- return statusClearanceElapsedTimeMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 18a471e..0b30a26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -258,6 +258,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
new file mode 100644
index 0000000..111cabb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+
+/**
+ * This class is the Namenode implementation for analyzing the file blocks which
+ * are expecting to change its storages and assigning the block storage
+ * movements to satisfy the storage policy.
+ */
+// TODO: Now, added one API which is required for sps package. Will refine
+// this interface via HDFS-12911.
+public class IntraNNSPSContext implements StoragePolicySatisfier.Context {
+ private final Namesystem namesystem;
+
+ public IntraNNSPSContext(Namesystem namesystem) {
+ this.namesystem = namesystem;
+ }
+
+ @Override
+ public int getNumLiveDataNodes() {
+ return namesystem.getFSDirectory().getBlockManager().getDatanodeManager()
+ .getNumLiveDataNodes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
deleted file mode 100644
index 972e744..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ /dev/null
@@ -1,973 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.server.balancer.Matcher;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
-import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.util.Daemon;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Setting storagePolicy on a file after the file write will only update the new
- * storage policy type in Namespace, but physical block storage movement will
- * not happen until user runs "Mover Tool" explicitly for such files. The
- * StoragePolicySatisfier Daemon thread implemented for addressing the case
- * where users may want to physically move the blocks by HDFS itself instead of
- * running mover tool explicitly. Just calling client API to
- * satisfyStoragePolicy on a file/dir will automatically trigger to move its
- * physical storage locations as expected in asynchronous manner. Here Namenode
- * will pick the file blocks which are expecting to change its storages, then it
- * will build the mapping of source block location and expected storage type and
- * location to move. After that this class will also prepare commands to send to
- * Datanode for processing the physical block movements.
- */
-@InterfaceAudience.Private
-public class StoragePolicySatisfier implements Runnable {
- public static final Logger LOG =
- LoggerFactory.getLogger(StoragePolicySatisfier.class);
- private Daemon storagePolicySatisfierThread;
- private final Namesystem namesystem;
- private final BlockManager blockManager;
- private final BlockStorageMovementNeeded storageMovementNeeded;
- private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
- private volatile boolean isRunning = false;
- private int spsWorkMultiplier;
- private long blockCount = 0L;
- private int blockMovementMaxRetry;
- /**
- * Represents the collective analysis status for all blocks.
- */
- private static class BlocksMovingAnalysis {
-
- enum Status {
- // Represents that, the analysis skipped due to some conditions. A such
- // condition is if block collection is in incomplete state.
- ANALYSIS_SKIPPED_FOR_RETRY,
- // Represents that few or all blocks found respective target to do
- // the storage movement.
- BLOCKS_TARGETS_PAIRED,
- // Represents that none of the blocks found respective target to do
- // the storage movement.
- NO_BLOCKS_TARGETS_PAIRED,
- // Represents that, none of the blocks found for block storage movements.
- BLOCKS_ALREADY_SATISFIED,
- // Represents that, the analysis skipped due to some conditions.
- // Example conditions are if no blocks really exists in block collection
- // or
- // if analysis is not required on ec files with unsuitable storage
- // policies
- BLOCKS_TARGET_PAIRING_SKIPPED,
- // Represents that, All the reported blocks are satisfied the policy but
- // some of the blocks are low redundant.
- FEW_LOW_REDUNDANCY_BLOCKS
- }
-
- private Status status = null;
- private List<Block> assignedBlocks = null;
-
- BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
- this.status = status;
- this.assignedBlocks = blockMovingInfo;
- }
- }
-
- public StoragePolicySatisfier(final Namesystem namesystem,
- final BlockManager blkManager, Configuration conf) {
- this.namesystem = namesystem;
- this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
- this, conf.getInt(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
- this.blockManager = blkManager;
- this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
- conf.getLong(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT),
- conf.getLong(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
- storageMovementNeeded);
- this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
- this.blockMovementMaxRetry = conf.getInt(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
- }
-
- /**
- * Start storage policy satisfier demon thread. Also start block storage
- * movements monitor for retry the attempts if needed.
- */
- public synchronized void start(boolean reconfigStart) {
- isRunning = true;
- if (checkIfMoverRunning()) {
- isRunning = false;
- LOG.error(
- "Stopping StoragePolicySatisfier thread " + "as Mover ID file "
- + HdfsServerConstants.MOVER_ID_PATH.toString()
- + " been opened. Maybe a Mover instance is running!");
- return;
- }
- if (reconfigStart) {
- LOG.info("Starting StoragePolicySatisfier, as admin requested to "
- + "start it.");
- } else {
- LOG.info("Starting StoragePolicySatisfier.");
- }
-
- // Ensure that all the previously submitted block movements(if any) have to
- // be stopped in all datanodes.
- addDropSPSWorkCommandsToAllDNs();
- storageMovementNeeded.init();
- storagePolicySatisfierThread = new Daemon(this);
- storagePolicySatisfierThread.setName("StoragePolicySatisfier");
- storagePolicySatisfierThread.start();
- this.storageMovementsMonitor.start();
- }
-
- /**
- * Disables storage policy satisfier by stopping its services.
- *
- * @param forceStop
- * true represents that it should stop SPS service by clearing all
- * pending SPS work
- */
- public synchronized void disable(boolean forceStop) {
- isRunning = false;
-
- if (storagePolicySatisfierThread == null) {
- return;
- }
-
- storageMovementNeeded.close();
-
- storagePolicySatisfierThread.interrupt();
- this.storageMovementsMonitor.stop();
- if (forceStop) {
- storageMovementNeeded.clearQueuesWithNotification();
- addDropSPSWorkCommandsToAllDNs();
- } else {
- LOG.info("Stopping StoragePolicySatisfier.");
- }
- }
-
- /**
- * Timed wait to stop storage policy satisfier daemon threads.
- */
- public synchronized void stopGracefully() {
- if (isRunning) {
- disable(true);
- }
- this.storageMovementsMonitor.stopGracefully();
-
- if (storagePolicySatisfierThread == null) {
- return;
- }
- try {
- storagePolicySatisfierThread.join(3000);
- } catch (InterruptedException ie) {
- }
- }
-
- /**
- * Check whether StoragePolicySatisfier is running.
- * @return true if running
- */
- public boolean isRunning() {
- return isRunning;
- }
-
- // Return true if a Mover instance is running
- private boolean checkIfMoverRunning() {
- String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
- return namesystem.isFileOpenedForWrite(moverId);
- }
-
- /**
- * Adding drop commands to all datanodes to stop performing the satisfier
- * block movements, if any.
- */
- private void addDropSPSWorkCommandsToAllDNs() {
- this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
- }
-
- @Override
- public void run() {
- while (namesystem.isRunning() && isRunning) {
- try {
- if (!namesystem.isInSafeMode()) {
- ItemInfo itemInfo = storageMovementNeeded.get();
- if (itemInfo != null) {
- if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
- LOG.info("Failed to satisfy the policy after "
- + blockMovementMaxRetry + " retries. Removing inode "
- + itemInfo.getTrackId() + " from the queue");
- storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
- continue;
- }
- long trackId = itemInfo.getTrackId();
- BlockCollection blockCollection;
- BlocksMovingAnalysis status = null;
- try {
- namesystem.readLock();
- blockCollection = namesystem.getBlockCollection(trackId);
- // Check blockCollectionId existence.
- if (blockCollection == null) {
- // File doesn't exists (maybe got deleted), remove trackId from
- // the queue
- storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
- } else {
- status =
- analyseBlocksStorageMovementsAndAssignToDN(
- blockCollection);
- }
- } finally {
- namesystem.readUnlock();
- }
- if (blockCollection != null) {
- switch (status.status) {
- // Just add to monitor, so it will be retried after timeout
- case ANALYSIS_SKIPPED_FOR_RETRY:
- // Just add to monitor, so it will be tracked for report and
- // be removed on storage movement attempt finished report.
- case BLOCKS_TARGETS_PAIRED:
- this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
- .getStartId(), itemInfo.getTrackId(), monotonicNow(),
- status.assignedBlocks, itemInfo.getRetryCount()));
- break;
- case NO_BLOCKS_TARGETS_PAIRED:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID " + trackId
- + " back to retry queue as none of the blocks"
- + " found its eligible targets.");
- }
- itemInfo.retryCount++;
- this.storageMovementNeeded.add(itemInfo);
- break;
- case FEW_LOW_REDUNDANCY_BLOCKS:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID " + trackId
- + " back to retry queue as some of the blocks"
- + " are low redundant.");
- }
- this.storageMovementNeeded.add(itemInfo);
- break;
- // Just clean Xattrs
- case BLOCKS_TARGET_PAIRING_SKIPPED:
- case BLOCKS_ALREADY_SATISFIED:
- default:
- LOG.info("Block analysis skipped or blocks already satisfied"
- + " with storages. So, Cleaning up the Xattrs.");
- storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
- break;
- }
- }
- }
- }
- int numLiveDn = namesystem.getFSDirectory().getBlockManager()
- .getDatanodeManager().getNumLiveDataNodes();
- if (storageMovementNeeded.size() == 0
- || blockCount > (numLiveDn * spsWorkMultiplier)) {
- Thread.sleep(3000);
- blockCount = 0L;
- }
- } catch (Throwable t) {
- handleException(t);
- }
- }
- }
-
- private void handleException(Throwable t) {
- // double check to avoid entering into synchronized block.
- if (isRunning) {
- synchronized (this) {
- if (isRunning) {
- isRunning = false;
- // Stopping monitor thread and clearing queues as well
- this.clearQueues();
- this.storageMovementsMonitor.stopGracefully();
- if (!namesystem.isRunning()) {
- LOG.info("Stopping StoragePolicySatisfier.");
- if (!(t instanceof InterruptedException)) {
- LOG.info("StoragePolicySatisfier received an exception"
- + " while shutting down.", t);
- }
- return;
- }
- }
- }
- }
- LOG.error("StoragePolicySatisfier thread received runtime exception. "
- + "Stopping Storage policy satisfier work", t);
- return;
- }
-
- private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
- BlockCollection blockCollection) {
- BlocksMovingAnalysis.Status status =
- BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
- byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
- BlockStoragePolicy existingStoragePolicy =
- blockManager.getStoragePolicy(existingStoragePolicyID);
- if (!blockCollection.getLastBlock().isComplete()) {
- // Postpone, currently file is under construction
- // So, should we add back? or leave it to user
- LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
- + " this to the next retry iteration", blockCollection.getId());
- return new BlocksMovingAnalysis(
- BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
- new ArrayList<>());
- }
-
- BlockInfo[] blocks = blockCollection.getBlocks();
- if (blocks.length == 0) {
- LOG.info("BlockCollectionID: {} file is not having any blocks."
- + " So, skipping the analysis.", blockCollection.getId());
- return new BlocksMovingAnalysis(
- BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
- new ArrayList<>());
- }
- List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
-
- for (int i = 0; i < blocks.length; i++) {
- BlockInfo blockInfo = blocks[i];
- List<StorageType> expectedStorageTypes;
- if (blockInfo.isStriped()) {
- if (ErasureCodingPolicyManager
- .checkStoragePolicySuitableForECStripedMode(
- existingStoragePolicyID)) {
- expectedStorageTypes = existingStoragePolicy
- .chooseStorageTypes((short) blockInfo.getCapacity());
- } else {
- // Currently we support only limited policies (HOT, COLD, ALLSSD)
- // for EC striped mode files. SPS will ignore to move the blocks if
- // the storage policy is not in EC Striped mode supported policies
- LOG.warn("The storage policy " + existingStoragePolicy.getName()
- + " is not suitable for Striped EC files. "
- + "So, ignoring to move the blocks");
- return new BlocksMovingAnalysis(
- BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
- new ArrayList<>());
- }
- } else {
- expectedStorageTypes = existingStoragePolicy
- .chooseStorageTypes(blockInfo.getReplication());
- }
-
- DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
- StorageType[] storageTypes = new StorageType[storages.length];
- for (int j = 0; j < storages.length; j++) {
- DatanodeStorageInfo datanodeStorageInfo = storages[j];
- StorageType storageType = datanodeStorageInfo.getStorageType();
- storageTypes[j] = storageType;
- }
- List<StorageType> existing =
- new LinkedList<StorageType>(Arrays.asList(storageTypes));
- if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
- existing, true)) {
- boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
- blockInfo, expectedStorageTypes, existing, storages);
- if (blocksPaired) {
- status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
- } else {
- // none of the blocks found its eligible targets for satisfying the
- // storage policy.
- status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
- }
- } else {
- if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
- status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
- }
- }
- }
-
- List<Block> assignedBlockIds = new ArrayList<Block>();
- for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- // Check for at least one block storage movement has been chosen
- if (blkMovingInfo.getTarget() != null) {
- // assign block storage movement task to the target node
- ((DatanodeDescriptor) blkMovingInfo.getTarget())
- .addBlocksToMoveStorage(blkMovingInfo);
- LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
- assignedBlockIds.add(blkMovingInfo.getBlock());
- blockCount++;
- }
- }
- return new BlocksMovingAnalysis(status, assignedBlockIds);
- }
-
- /**
- * Compute the list of block moving information corresponding to the given
- * blockId. This will check that each block location of the given block is
- * satisfying the expected storage policy. If block location is not satisfied
- * the policy then find out the target node with the expected storage type to
- * satisfy the storage policy.
- *
- * @param blockMovingInfos
- * - list of block source and target node pair
- * @param blockInfo
- * - block details
- * @param expectedStorageTypes
- * - list of expected storage type to satisfy the storage policy
- * @param existing
- * - list to get existing storage types
- * @param storages
- * - available storages
- * @return false if some of the block locations failed to find target node to
- * satisfy the storage policy, true otherwise
- */
- private boolean computeBlockMovingInfos(
- List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
- List<StorageType> expectedStorageTypes, List<StorageType> existing,
- DatanodeStorageInfo[] storages) {
- boolean foundMatchingTargetNodesForBlock = true;
- if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
- existing, true)) {
- List<StorageTypeNodePair> sourceWithStorageMap =
- new ArrayList<StorageTypeNodePair>();
- List<DatanodeStorageInfo> existingBlockStorages =
- new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
- // if expected type exists in source node already, local movement would be
- // possible, so lets find such sources first.
- Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator();
- while (iterator.hasNext()) {
- DatanodeStorageInfo datanodeStorageInfo = iterator.next();
- if (checkSourceAndTargetTypeExists(
- datanodeStorageInfo.getDatanodeDescriptor(), existing,
- expectedStorageTypes)) {
- sourceWithStorageMap
- .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
- datanodeStorageInfo.getDatanodeDescriptor()));
- iterator.remove();
- existing.remove(datanodeStorageInfo.getStorageType());
- }
- }
-
- // Let's find sources for existing types left.
- for (StorageType existingType : existing) {
- iterator = existingBlockStorages.iterator();
- while (iterator.hasNext()) {
- DatanodeStorageInfo datanodeStorageInfo = iterator.next();
- StorageType storageType = datanodeStorageInfo.getStorageType();
- if (storageType == existingType) {
- iterator.remove();
- sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
- datanodeStorageInfo.getDatanodeDescriptor()));
- break;
- }
- }
- }
-
- StorageTypeNodeMap locsForExpectedStorageTypes =
- findTargetsForExpectedStorageTypes(expectedStorageTypes);
-
- foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
- blockMovingInfos, blockInfo, sourceWithStorageMap,
- expectedStorageTypes, locsForExpectedStorageTypes);
- }
- return foundMatchingTargetNodesForBlock;
- }
-
- /**
- * Find the good target node for each source node for which block storages was
- * misplaced.
- *
- * @param blockMovingInfos
- * - list of block source and target node pair
- * @param blockInfo
- * - Block
- * @param sourceWithStorageList
- * - Source Datanode with storages list
- * @param expected
- * - Expecting storages to move
- * @param locsForExpectedStorageTypes
- * - Available DNs for expected storage types
- * @return false if some of the block locations failed to find target node to
- * satisfy the storage policy
- */
- private boolean findSourceAndTargetToMove(
- List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
- List<StorageTypeNodePair> sourceWithStorageList,
- List<StorageType> expected,
- StorageTypeNodeMap locsForExpectedStorageTypes) {
- boolean foundMatchingTargetNodesForBlock = true;
- List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
-
- // Looping over all the source node locations and choose the target
- // storage within same node if possible. This is done separately to
- // avoid choosing a target which already has this block.
- for (int i = 0; i < sourceWithStorageList.size(); i++) {
- StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
-
- // Check whether the block replica is already placed in the expected
- // storage type in this source datanode.
- if (!expected.contains(existingTypeNodePair.storageType)) {
- StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
- blockInfo, existingTypeNodePair.dn, expected);
- if (chosenTarget != null) {
- if (blockInfo.isStriped()) {
- buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
- existingTypeNodePair.storageType, chosenTarget.dn,
- chosenTarget.storageType, blockMovingInfos);
- } else {
- buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
- existingTypeNodePair.storageType, chosenTarget.dn,
- chosenTarget.storageType, blockMovingInfos);
- }
- expected.remove(chosenTarget.storageType);
- // TODO: We can increment scheduled block count for this node?
- }
- }
- // To avoid choosing this excludeNodes as targets later
- excludeNodes.add(existingTypeNodePair.dn);
- }
-
- // Looping over all the source node locations. Choose a remote target
- // storage node if it was not found out within same node.
- for (int i = 0; i < sourceWithStorageList.size(); i++) {
- StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
- StorageTypeNodePair chosenTarget = null;
- // Chosen the target storage within same datanode. So just skipping this
- // source node.
- if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
- continue;
- }
- if (chosenTarget == null && blockManager.getDatanodeManager()
- .getNetworkTopology().isNodeGroupAware()) {
- chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
- expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
- excludeNodes);
- }
-
- // Then, match nodes on the same rack
- if (chosenTarget == null) {
- chosenTarget =
- chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
- Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes);
- }
-
- if (chosenTarget == null) {
- chosenTarget =
- chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
- Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
- }
- if (null != chosenTarget) {
- if (blockInfo.isStriped()) {
- buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
- existingTypeNodePair.storageType, chosenTarget.dn,
- chosenTarget.storageType, blockMovingInfos);
- } else {
- buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
- existingTypeNodePair.storageType, chosenTarget.dn,
- chosenTarget.storageType, blockMovingInfos);
- }
-
- expected.remove(chosenTarget.storageType);
- excludeNodes.add(chosenTarget.dn);
- // TODO: We can increment scheduled block count for this node?
- } else {
- LOG.warn(
- "Failed to choose target datanode for the required"
- + " storage types {}, block:{}, existing storage type:{}",
- expected, blockInfo, existingTypeNodePair.storageType);
- }
- }
-
- if (expected.size() > 0) {
- foundMatchingTargetNodesForBlock = false;
- }
-
- return foundMatchingTargetNodesForBlock;
- }
-
- private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
- DatanodeDescriptor dn) {
- for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
- if (blockMovingInfo.getSource().equals(dn)) {
- return true;
- }
- }
- return false;
- }
-
- private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
- DatanodeInfo sourceNode, StorageType sourceStorageType,
- DatanodeInfo targetNode, StorageType targetStorageType,
- List<BlockMovingInfo> blkMovingInfos) {
- Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
- blockInfo.getGenerationStamp());
- BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
- targetNode, sourceStorageType, targetStorageType);
- blkMovingInfos.add(blkMovingInfo);
- }
-
- private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
- DatanodeInfo sourceNode, StorageType sourceStorageType,
- DatanodeInfo targetNode, StorageType targetStorageType,
- List<BlockMovingInfo> blkMovingInfos) {
- // For a striped block, it needs to construct internal block at the given
- // index of a block group. Here it is iterating over all the block indices
- // and construct internal blocks which can be then considered for block
- // movement.
- BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
- for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
- if (si.getBlockIndex() >= 0) {
- DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
- if (sourceNode.equals(dn)) {
- // construct internal block
- long blockId = blockInfo.getBlockId() + si.getBlockIndex();
- long numBytes = StripedBlockUtil.getInternalBlockLength(
- sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
- sBlockInfo.getDataBlockNum(), si.getBlockIndex());
- Block blk = new Block(blockId, numBytes,
- blockInfo.getGenerationStamp());
- BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
- targetNode, sourceStorageType, targetStorageType);
- blkMovingInfos.add(blkMovingInfo);
- }
- }
- }
- }
-
- /**
- * Choose the target storage within same datanode if possible.
- *
- * @param block
- * - block info
- * @param source
- * - source datanode
- * @param targetTypes
- * - list of target storage types
- */
- private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
- DatanodeDescriptor source, List<StorageType> targetTypes) {
- for (StorageType t : targetTypes) {
- DatanodeStorageInfo chooseStorage4Block =
- source.chooseStorage4Block(t, block.getNumBytes());
- if (chooseStorage4Block != null) {
- return new StorageTypeNodePair(t, source);
- }
- }
- return null;
- }
-
- private StorageTypeNodePair chooseTarget(Block block,
- DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
- StorageTypeNodeMap locsForExpectedStorageTypes,
- List<DatanodeDescriptor> excludeNodes) {
- for (StorageType t : targetTypes) {
- List<DatanodeDescriptor> nodesWithStorages =
- locsForExpectedStorageTypes.getNodesWithStorages(t);
- if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
- continue; // no target nodes with the required storage type.
- }
- Collections.shuffle(nodesWithStorages);
- for (DatanodeDescriptor target : nodesWithStorages) {
- if (!excludeNodes.contains(target) && matcher.match(
- blockManager.getDatanodeManager().getNetworkTopology(), source,
- target)) {
- if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
- return new StorageTypeNodePair(t, target);
- }
- }
- }
- }
- return null;
- }
-
- private static class StorageTypeNodePair {
- private StorageType storageType = null;
- private DatanodeDescriptor dn = null;
-
- StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
- this.storageType = storageType;
- this.dn = dn;
- }
- }
-
- private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
- List<StorageType> expected) {
- StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
- List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
- .getDatanodeListForReport(DatanodeReportType.LIVE);
- for (DatanodeDescriptor dn : reports) {
- StorageReport[] storageReports = dn.getStorageReports();
- for (StorageReport storageReport : storageReports) {
- StorageType t = storageReport.getStorage().getStorageType();
- if (expected.contains(t)) {
- final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
- if (maxRemaining > 0L) {
- targetMap.add(t, dn);
- }
- }
- }
- }
- return targetMap;
- }
-
- private static long getMaxRemaining(StorageReport[] storageReports,
- StorageType t) {
- long max = 0L;
- for (StorageReport r : storageReports) {
- if (r.getStorage().getStorageType() == t) {
- if (r.getRemaining() > max) {
- max = r.getRemaining();
- }
- }
- }
- return max;
- }
-
- private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
- List<StorageType> existing, List<StorageType> expectedStorageTypes) {
- DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
- boolean isExpectedTypeAvailable = false;
- boolean isExistingTypeAvailable = false;
- for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
- StorageType storageType = dnInfo.getStorageType();
- if (existing.contains(storageType)) {
- isExistingTypeAvailable = true;
- }
- if (expectedStorageTypes.contains(storageType)) {
- isExpectedTypeAvailable = true;
- }
- }
- return isExistingTypeAvailable && isExpectedTypeAvailable;
- }
-
- private static class StorageTypeNodeMap {
- private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
- new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
-
- private void add(StorageType t, DatanodeDescriptor dn) {
- List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
- LinkedList<DatanodeDescriptor> value = null;
- if (nodesWithStorages == null) {
- value = new LinkedList<DatanodeDescriptor>();
- value.add(dn);
- typeNodeMap.put(t, value);
- } else {
- nodesWithStorages.add(dn);
- }
- }
-
- /**
- * @param type
- * - Storage type
- * @return datanodes which has the given storage type
- */
- private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
- return typeNodeMap.get(type);
- }
- }
-
- /**
- * Receives set of storage movement attempt finished blocks report.
- *
- * @param moveAttemptFinishedBlks
- * set of storage movement attempt finished blocks.
- */
- void handleStorageMovementAttemptFinishedBlks(
- BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
- if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
- return;
- }
- storageMovementsMonitor
- .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
- }
-
- @VisibleForTesting
- BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
- return storageMovementsMonitor;
- }
-
- /**
- * Clear the queues from to be storage movement needed lists and items tracked
- * in storage movement monitor.
- */
- public void clearQueues() {
- LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, "
- + "user requests on satisfying block storages would be discarded.");
- storageMovementNeeded.clearAll();
- }
-
- /**
- * Set file inode in queue for which storage movement needed for its blocks.
- *
- * @param inodeId
- * - file inode/blockcollection id.
- */
- public void satisfyStoragePolicy(Long inodeId) {
- //For file startId and trackId is same
- storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added track info for inode {} to block "
- + "storageMovementNeeded queue", inodeId);
- }
- }
-
- public void addInodeToPendingDirQueue(long id) {
- storageMovementNeeded.addToPendingDirQueue(id);
- }
-
- /**
- * Clear queues for given track id.
- */
- public void clearQueue(long trackId) {
- storageMovementNeeded.clearQueue(trackId);
- }
-
- /**
- * ItemInfo is a file info object for which need to satisfy the
- * policy.
- */
- public static class ItemInfo {
- private long startId;
- private long trackId;
- private int retryCount;
-
- public ItemInfo(long startId, long trackId) {
- this.startId = startId;
- this.trackId = trackId;
- //set 0 when item is getting added first time in queue.
- this.retryCount = 0;
- }
-
- public ItemInfo(long startId, long trackId, int retryCount) {
- this.startId = startId;
- this.trackId = trackId;
- this.retryCount = retryCount;
- }
-
- /**
- * Return the start inode id of the current track Id.
- */
- public long getStartId() {
- return startId;
- }
-
- /**
- * Return the File inode Id for which needs to satisfy the policy.
- */
- public long getTrackId() {
- return trackId;
- }
-
- /**
- * Returns true if the tracking path is a directory, false otherwise.
- */
- public boolean isDir() {
- return (startId != trackId);
- }
-
- /**
- * Get the attempted retry count of the block for satisfy the policy.
- */
- public int getRetryCount() {
- return retryCount;
- }
- }
-
- /**
- * This class contains information of an attempted blocks and its last
- * attempted or reported time stamp. This is used by
- * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
- */
- final static class AttemptedItemInfo extends ItemInfo {
- private long lastAttemptedOrReportedTime;
- private final List<Block> blocks;
-
- /**
- * AttemptedItemInfo constructor.
- *
- * @param rootId
- * rootId for trackId
- * @param trackId
- * trackId for file.
- * @param lastAttemptedOrReportedTime
- * last attempted or reported time
- */
- AttemptedItemInfo(long rootId, long trackId,
- long lastAttemptedOrReportedTime,
- List<Block> blocks, int retryCount) {
- super(rootId, trackId, retryCount);
- this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
- this.blocks = blocks;
- }
-
- /**
- * @return last attempted or reported time stamp.
- */
- long getLastAttemptedOrReportedTime() {
- return lastAttemptedOrReportedTime;
- }
-
- /**
- * Update lastAttemptedOrReportedTime, so that the expiration time will be
- * postponed to future.
- */
- void touchLastReportedTimeStamp() {
- this.lastAttemptedOrReportedTime = monotonicNow();
- }
-
- List<Block> getBlocks() {
- return this.blocks;
- }
-
- }
-
- public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException {
- INode inode = namesystem.getFSDirectory().getINode(path);
- return storageMovementNeeded.getStatus(inode.getId());
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dccccbac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..b044f30
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A monitor class for checking whether block storage movements attempt
+ * completed or not. If this receives block storage movement attempt
+ * status(either success or failure) from DN then it will just remove the
+ * entries from tracking. If there is no DN reports about movement attempt
+ * finished for a longer time period, then such items will retries automatically
+ * after timeout. The default timeout would be 5 minutes.
+ */
+public class BlockStorageMovementAttemptedItems {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
+
+ /**
+ * A map holds the items which are already taken for blocks movements
+ * processing and sent to DNs.
+ */
+ private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+ private final List<Block> movementFinishedBlocks;
+ private volatile boolean monitorRunning = true;
+ private Daemon timerThread = null;
+ //
+ // It might take anywhere between 5 to 10 minutes before
+ // a request is timed out.
+ //
+ private long selfRetryTimeout = 5 * 60 * 1000;
+
+ //
+ // It might take anywhere between 1 to 2 minutes before
+ // a request is timed out.
+ //
+ private long minCheckTimeout = 1 * 60 * 1000; // minimum value
+ private BlockStorageMovementNeeded blockStorageMovementNeeded;
+
+ public BlockStorageMovementAttemptedItems(long recheckTimeout,
+ long selfRetryTimeout,
+ BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+ if (recheckTimeout > 0) {
+ this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
+ }
+
+ this.selfRetryTimeout = selfRetryTimeout;
+ this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
+ storageMovementAttemptedItems = new ArrayList<>();
+ movementFinishedBlocks = new ArrayList<>();
+ }
+
+ /**
+ * Add item to block storage movement attempted items map which holds the
+ * tracking/blockCollection id versus time stamp.
+ *
+ * @param itemInfo
+ * - tracking info
+ */
+ public void add(AttemptedItemInfo itemInfo) {
+ synchronized (storageMovementAttemptedItems) {
+ storageMovementAttemptedItems.add(itemInfo);
+ }
+ }
+
+ /**
+ * Add the storage movement attempt finished blocks to
+ * storageMovementFinishedBlocks.
+ *
+ * @param moveAttemptFinishedBlks
+ * storage movement attempt finished blocks
+ */
+ public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+ if (moveAttemptFinishedBlks.length == 0) {
+ return;
+ }
+ synchronized (movementFinishedBlocks) {
+ movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
+ }
+ }
+
+ /**
+ * Starts the monitor thread.
+ */
+ public synchronized void start() {
+ monitorRunning = true;
+ timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
+ timerThread.setName("BlocksStorageMovementAttemptMonitor");
+ timerThread.start();
+ }
+
+ /**
+ * Sets running flag to false. Also, this will interrupt monitor thread and
+ * clear all the queued up tasks.
+ */
+ public synchronized void stop() {
+ monitorRunning = false;
+ if (timerThread != null) {
+ timerThread.interrupt();
+ }
+ this.clearQueues();
+ }
+
+ /**
+ * Timed wait to stop monitor thread.
+ */
+ synchronized void stopGracefully() {
+ if (timerThread == null) {
+ return;
+ }
+ if (monitorRunning) {
+ stop();
+ }
+ try {
+ timerThread.join(3000);
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ /**
+ * A monitor class for checking block storage movement attempt status and long
+ * waiting items periodically.
+ */
+ private class BlocksStorageMovementAttemptMonitor implements Runnable {
+ @Override
+ public void run() {
+ while (monitorRunning) {
+ try {
+ blockStorageMovementReportedItemsCheck();
+ blocksStorageMovementUnReportedItemsCheck();
+ Thread.sleep(minCheckTimeout);
+ } catch (InterruptedException ie) {
+ LOG.info("BlocksStorageMovementAttemptMonitor thread "
+ + "is interrupted.", ie);
+ } catch (IOException ie) {
+ LOG.warn("BlocksStorageMovementAttemptMonitor thread "
+ + "received exception and exiting.", ie);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void blocksStorageMovementUnReportedItemsCheck() {
+ synchronized (storageMovementAttemptedItems) {
+ Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+ .iterator();
+ long now = monotonicNow();
+ while (iter.hasNext()) {
+ AttemptedItemInfo itemInfo = iter.next();
+ if (now > itemInfo.getLastAttemptedOrReportedTime()
+ + selfRetryTimeout) {
+ Long blockCollectionID = itemInfo.getTrackId();
+ synchronized (movementFinishedBlocks) {
+ ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
+ blockCollectionID, itemInfo.getRetryCount() + 1);
+ blockStorageMovementNeeded.add(candidate);
+ iter.remove();
+ LOG.info("TrackID: {} becomes timed out and moved to needed "
+ + "retries queue for next iteration.", blockCollectionID);
+ }
+ }
+ }
+
+ }
+ }
+
+ @VisibleForTesting
+ void blockStorageMovementReportedItemsCheck() throws IOException {
+ synchronized (movementFinishedBlocks) {
+ Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
+ while (finishedBlksIter.hasNext()) {
+ Block blk = finishedBlksIter.next();
+ synchronized (storageMovementAttemptedItems) {
+ Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
+ .iterator();
+ while (iterator.hasNext()) {
+ AttemptedItemInfo attemptedItemInfo = iterator.next();
+ attemptedItemInfo.getBlocks().remove(blk);
+ if (attemptedItemInfo.getBlocks().isEmpty()) {
+ // TODO: try add this at front of the Queue, so that this element
+ // gets the chance first and can be cleaned from queue quickly as
+ // all movements already done.
+ blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
+ .getStartId(), attemptedItemInfo.getTrackId(),
+ attemptedItemInfo.getRetryCount() + 1));
+ iterator.remove();
+ }
+ }
+ }
+ // Remove attempted blocks from movementFinishedBlocks list.
+ finishedBlksIter.remove();
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public int getMovementFinishedBlocksCount() {
+ return movementFinishedBlocks.size();
+ }
+
+ @VisibleForTesting
+ public int getAttemptedItemsCount() {
+ return storageMovementAttemptedItems.size();
+ }
+
+ public void clearQueues() {
+ movementFinishedBlocks.clear();
+ storageMovementAttemptedItems.clear();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org