You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 20:26:58 UTC
[46/50] [abbrv] hadoop git commit: HDFS-13381 : [SPS]: Use
DFSUtilClient#makePathFromFileId() to prepare satisfier file path.
Contributed by Rakesh R.
HDFS-13381 : [SPS]: Use DFSUtilClient#makePathFromFileId() to prepare satisfier file path. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df0af615
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df0af615
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df0af615
Branch: refs/heads/HDFS-10285
Commit: df0af615a52f390ce5958fc2bd5a965ed83f2489
Parents: 9e2f018
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Mon Jul 2 17:22:00 2018 -0700
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Fri Aug 10 13:23:05 2018 -0700
----------------------------------------------------------------------
.../NamenodeProtocolServerSideTranslatorPB.java | 2 +-
.../NamenodeProtocolTranslatorPB.java | 2 +-
.../server/blockmanagement/BlockManager.java | 2 +-
.../hdfs/server/namenode/FSNamesystem.java | 11 ---
.../hdfs/server/namenode/NameNodeRpcServer.java | 8 +-
.../hadoop/hdfs/server/namenode/Namesystem.java | 9 ---
.../sps/BlockStorageMovementAttemptedItems.java | 72 +++++++----------
.../sps/BlockStorageMovementNeeded.java | 61 ++++++--------
.../hdfs/server/namenode/sps/Context.java | 45 ++++++++---
.../namenode/sps/DatanodeCacheManager.java | 4 +-
.../hdfs/server/namenode/sps/FileCollector.java | 13 +--
.../namenode/sps/IntraSPSNameNodeContext.java | 54 +++++++++----
.../sps/IntraSPSNameNodeFileIdCollector.java | 14 ++--
.../hdfs/server/namenode/sps/ItemInfo.java | 34 ++++----
.../hdfs/server/namenode/sps/SPSService.java | 31 +++----
.../namenode/sps/StoragePolicySatisfier.java | 61 +++++---------
.../sps/StoragePolicySatisfyManager.java | 20 ++---
.../hdfs/server/protocol/NamenodeProtocol.java | 2 +-
.../sps/ExternalSPSBlockMoveTaskHandler.java | 4 +-
.../hdfs/server/sps/ExternalSPSContext.java | 85 ++++++++++++++++----
.../sps/ExternalSPSFilePathCollector.java | 36 +++++----
.../sps/ExternalStoragePolicySatisfier.java | 30 +------
.../src/main/proto/NamenodeProtocol.proto | 2 +-
.../TestBlockStorageMovementAttemptedItems.java | 16 ++--
.../sps/TestStoragePolicySatisfier.java | 66 +++++----------
...stStoragePolicySatisfierWithStripedFile.java | 41 ++++------
.../sps/TestExternalStoragePolicySatisfier.java | 35 +++-----
27 files changed, 346 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index e4283c6..d9367fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -265,7 +265,7 @@ public class NamenodeProtocolServerSideTranslatorPB implements
RpcController controller, GetNextSPSPathRequestProto request)
throws ServiceException {
try {
- String nextSPSPath = impl.getNextSPSPath();
+ Long nextSPSPath = impl.getNextSPSPath();
if (nextSPSPath == null) {
return GetNextSPSPathResponseProto.newBuilder().build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 97dee9b..3bd5986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -267,7 +267,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
}
@Override
- public String getNextSPSPath() throws IOException {
+ public Long getNextSPSPath() throws IOException {
GetNextSPSPathRequestProto req =
GetNextSPSPathRequestProto.newBuilder().build();
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 42e246c..bae6b4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3897,7 +3897,7 @@ public class BlockManager implements BlockStatsMXBean {
private void notifyStorageMovementAttemptFinishedBlk(
DatanodeStorageInfo storageInfo, Block block) {
if (getSPSManager() != null) {
- SPSService<Long> sps = getSPSManager().getInternalSPSService();
+ SPSService sps = getSPSManager().getInternalSPSService();
if (sps.isRunning()) {
sps.notifyStorageMovementAttemptFinishedBlk(
storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e1ceecd..afe9092 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3202,17 +3202,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return stat;
}
- @Override
- public String getFilePath(Long inodeId) {
- readLock();
- try {
- INode inode = getFSDirectory().getInode(inodeId);
- return inode == null ? null : inode.getFullPathName();
- } finally {
- readUnlock();
- }
- }
-
/**
* Returns true if the file is closed
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57e827d..2f3325f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2569,7 +2569,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
- public String getNextSPSPath() throws IOException {
+ public Long getNextSPSPath() throws IOException {
checkNNStartup();
String operationName = "getNextSPSPath";
namesystem.checkSuperuserPrivilege(operationName);
@@ -2589,10 +2589,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
throw new IOException("SPS service mode is " + spsMode + ", so "
+ "external SPS service is not allowed to fetch the path Ids");
}
- Long pathId = spsMgr.getNextPathId();
- if (pathId == null) {
- return null;
- }
- return namesystem.getFilePath(pathId);
+ return namesystem.getBlockManager().getSPSManager().getNextPathId();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index fc933b7..82af4d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -77,13 +77,4 @@ public interface Namesystem extends RwLock, SafeMode {
*/
HdfsFileStatus getFileInfo(String filePath, boolean resolveLink,
boolean needLocation) throws IOException;
-
- /**
- * Gets the file path corresponds to the given file id.
- *
- * @param inodeId
- * file id
- * @return string file path
- */
- String getFilePath(Long inodeId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index 5b25491..df4f0dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -52,13 +52,8 @@ import com.google.common.annotations.VisibleForTesting;
* entries from tracking. If there is no DN reports about movement attempt
* finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes.
- *
- * @param <T>
- * is identifier of inode or full path name of inode. Internal sps will
- * use the file inodeId for the block movement. External sps will use
- * file string path representation for the block movement.
*/
-public class BlockStorageMovementAttemptedItems<T> {
+public class BlockStorageMovementAttemptedItems {
private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@@ -66,14 +61,14 @@ public class BlockStorageMovementAttemptedItems<T> {
* A map holds the items which are already taken for blocks movements
* processing and sent to DNs.
*/
- private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
+ private final List<AttemptedItemInfo> storageMovementAttemptedItems;
private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs;
// Maintains separate Queue to keep the movement finished blocks. This Q
// is used to update the storageMovementAttemptedItems list asynchronously.
private final BlockingQueue<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
- private BlockMovementListener blkMovementListener;
+ private final Context context;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
@@ -85,12 +80,12 @@ public class BlockStorageMovementAttemptedItems<T> {
// a request is timed out.
//
private long minCheckTimeout = 1 * 60 * 1000; // minimum value
- private BlockStorageMovementNeeded<T> blockStorageMovementNeeded;
- private final SPSService<T> service;
+ private BlockStorageMovementNeeded blockStorageMovementNeeded;
+ private final SPSService service;
- public BlockStorageMovementAttemptedItems(SPSService<T> service,
- BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles,
- BlockMovementListener blockMovementListener) {
+ public BlockStorageMovementAttemptedItems(SPSService service,
+ BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+ Context context) {
this.service = service;
long recheckTimeout = this.service.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
@@ -106,19 +101,27 @@ public class BlockStorageMovementAttemptedItems<T> {
storageMovementAttemptedItems = new ArrayList<>();
scheduledBlkLocs = new HashMap<>();
movementFinishedBlocks = new LinkedBlockingQueue<>();
- this.blkMovementListener = blockMovementListener;
+ this.context = context;
}
/**
* Add item to block storage movement attempted items map which holds the
* tracking/blockCollection id versus time stamp.
*
- * @param itemInfo
- * - tracking info
+ * @param startPathId
+ * - start satisfier path identifier
+ * @param fileId
+ * - file identifier
+ * @param monotonicNow
+ * - time now
+ * @param assignedBlocks
+ * - assigned blocks for block movement
+ * @param retryCount
+ * - retry count
*/
- public void add(T startPath, T file, long monotonicNow,
+ public void add(long startPathId, long fileId, long monotonicNow,
Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) {
- AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file,
+ AttemptedItemInfo itemInfo = new AttemptedItemInfo(startPathId, fileId,
monotonicNow, assignedBlocks.keySet(), retryCount);
synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.add(itemInfo);
@@ -161,11 +164,9 @@ public class BlockStorageMovementAttemptedItems<T> {
boolean foundType = dn.getStorageType().equals(type);
if (foundDn && foundType) {
blkLocs.remove(dn);
- // listener if it is plugged-in
- if (blkMovementListener != null) {
- blkMovementListener
- .notifyMovementTriedBlocks(new Block[] {reportedBlock});
- }
+ Block[] mFinishedBlocks = new Block[1];
+ mFinishedBlocks[0] = reportedBlock;
+ context.notifyMovementTriedBlocks(mFinishedBlocks);
// All the block locations has reported.
if (blkLocs.size() <= 0) {
movementFinishedBlocks.add(reportedBlock);
@@ -244,15 +245,15 @@ public class BlockStorageMovementAttemptedItems<T> {
@VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems
+ Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
.iterator();
long now = monotonicNow();
while (iter.hasNext()) {
- AttemptedItemInfo<T> itemInfo = iter.next();
+ AttemptedItemInfo itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
- T file = itemInfo.getFile();
- ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), file,
+ long file = itemInfo.getFile();
+ ItemInfo candidate = new ItemInfo(itemInfo.getStartPath(), file,
itemInfo.getRetryCount() + 1);
blockStorageMovementNeeded.add(candidate);
iter.remove();
@@ -272,13 +273,13 @@ public class BlockStorageMovementAttemptedItems<T> {
// Update attempted items list
for (Block blk : finishedBlks) {
synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems
+ Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
.iterator();
while (iterator.hasNext()) {
- AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
+ AttemptedItemInfo attemptedItemInfo = iterator.next();
attemptedItemInfo.getBlocks().remove(blk);
if (attemptedItemInfo.getBlocks().isEmpty()) {
- blockStorageMovementNeeded.add(new ItemInfo<T>(
+ blockStorageMovementNeeded.add(new ItemInfo(
attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(),
attemptedItemInfo.getRetryCount() + 1));
iterator.remove();
@@ -309,15 +310,4 @@ public class BlockStorageMovementAttemptedItems<T> {
scheduledBlkLocs.clear();
}
}
-
- /**
- * Sets external listener for testing.
- *
- * @param blkMoveListener
- * block movement listener callback object
- */
- @VisibleForTesting
- void setBlockMovementListener(BlockMovementListener blkMoveListener) {
- this.blkMovementListener = blkMoveListener;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index a194876..c95dcda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -43,47 +43,38 @@ import com.google.common.annotations.VisibleForTesting;
* schedule the block collection IDs for movement. It track the info of
* scheduled items and remove the SPS xAttr from the file/Directory once
* movement is success.
- *
- * @param <T>
- * is identifier of inode or full path name of inode. Internal sps will
- * use the file inodeId for the block movement. External sps will use
- * file string path representation for the block movement.
*/
@InterfaceAudience.Private
-public class BlockStorageMovementNeeded<T> {
+public class BlockStorageMovementNeeded {
public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
- private final Queue<ItemInfo<T>> storageMovementNeeded =
- new LinkedList<ItemInfo<T>>();
+ private final Queue<ItemInfo> storageMovementNeeded =
+ new LinkedList<ItemInfo>();
/**
* Map of startPath and number of child's. Number of child's indicate the
* number of files pending to satisfy the policy.
*/
- private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory =
+ private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
new HashMap<>();
- private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus =
+ private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>();
- private final Context<T> ctxt;
+ private final Context ctxt;
private Daemon pathIdCollector;
- private FileCollector<T> fileCollector;
-
private SPSPathIdProcessor pathIDProcessor;
// Amount of time to cache the SUCCESS status of path before turning it to
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
- public BlockStorageMovementNeeded(Context<T> context,
- FileCollector<T> fileCollector) {
+ public BlockStorageMovementNeeded(Context context) {
this.ctxt = context;
- this.fileCollector = fileCollector;
pathIDProcessor = new SPSPathIdProcessor();
}
@@ -94,7 +85,7 @@ public class BlockStorageMovementNeeded<T> {
* @param trackInfo
* - track info for satisfy the policy
*/
- public synchronized void add(ItemInfo<T> trackInfo) {
+ public synchronized void add(ItemInfo trackInfo) {
spsStatus.put(trackInfo.getFile(),
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
@@ -114,7 +105,7 @@ public class BlockStorageMovementNeeded<T> {
* scan.
*/
@VisibleForTesting
- public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList,
+ public synchronized void addAll(long startPath, List<ItemInfo> itemInfoList,
boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
@@ -131,7 +122,7 @@ public class BlockStorageMovementNeeded<T> {
* elements to scan.
*/
@VisibleForTesting
- public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) {
+ public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
storageMovementNeeded.add(itemInfo);
// This represents sps start id is file, so no need to update pending dir
// stats.
@@ -141,7 +132,7 @@ public class BlockStorageMovementNeeded<T> {
updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
}
- private void updatePendingDirScanStats(T startPath, int numScannedFiles,
+ private void updatePendingDirScanStats(long startPath, int numScannedFiles,
boolean scanCompleted) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
if (pendingWork == null) {
@@ -160,7 +151,7 @@ public class BlockStorageMovementNeeded<T> {
*
* @return satisfier files
*/
- public synchronized ItemInfo<T> get() {
+ public synchronized ItemInfo get() {
return storageMovementNeeded.poll();
}
@@ -181,12 +172,12 @@ public class BlockStorageMovementNeeded<T> {
* Decrease the pending child count for directory once one file blocks moved
* successfully. Remove the SPS xAttr if pending child count is zero.
*/
- public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo,
+ public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
boolean isSuccess) throws IOException {
if (trackInfo.isDir()) {
// If track is part of some start inode then reduce the pending
// directory work count.
- T startId = trackInfo.getStartPath();
+ long startId = trackInfo.getStartPath();
if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
@@ -212,11 +203,11 @@ public class BlockStorageMovementNeeded<T> {
}
}
- public synchronized void clearQueue(T trackId) {
+ public synchronized void clearQueue(long trackId) {
ctxt.removeSPSPathId(trackId);
- Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator();
+ Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
- ItemInfo<T> next = iterator.next();
+ ItemInfo next = iterator.next();
if (next.getFile() == trackId) {
iterator.remove();
}
@@ -227,7 +218,7 @@ public class BlockStorageMovementNeeded<T> {
/**
* Mark inode status as SUCCESS in map.
*/
- private void updateStatus(T startId, boolean isSuccess){
+ private void updateStatus(long startId, boolean isSuccess){
StoragePolicySatisfyPathStatusInfo spsStatusInfo =
spsStatus.get(startId);
if (spsStatusInfo == null) {
@@ -249,7 +240,7 @@ public class BlockStorageMovementNeeded<T> {
*/
public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories
- T trackId;
+ Long trackId;
while ((trackId = ctxt.getNextSPSPath()) != null) {
try {
// Remove xAttr for file
@@ -261,7 +252,7 @@ public class BlockStorageMovementNeeded<T> {
// File's directly added to storageMovementNeeded, So try to remove
// xAttr for file
- ItemInfo<T> itemInfo;
+ ItemInfo itemInfo;
while ((itemInfo = get()) != null) {
try {
// Remove xAttr for file
@@ -287,7 +278,7 @@ public class BlockStorageMovementNeeded<T> {
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0;
- T startINode = null;
+ Long startINode = null;
while (ctxt.isRunning()) {
try {
if (!ctxt.isInSafeMode()) {
@@ -301,7 +292,7 @@ public class BlockStorageMovementNeeded<T> {
spsStatus.put(startINode,
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
- fileCollector.scanAndCollectFiles(startINode);
+ ctxt.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startINode);
@@ -339,9 +330,9 @@ public class BlockStorageMovementNeeded<T> {
}
private synchronized void cleanSPSStatus() {
- for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = spsStatus
- .entrySet().iterator(); it.hasNext();) {
- Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next();
+ for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
+ spsStatus.entrySet().iterator(); it.hasNext();) {
+ Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
if (entry.getValue().canRemove()) {
it.remove();
}
@@ -477,7 +468,7 @@ public class BlockStorageMovementNeeded<T> {
return statusClearanceElapsedTimeMs;
}
- public void markScanCompletedForDir(T inode) {
+ public void markScanCompletedForDir(long inode) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
if (pendingWork != null) {
pendingWork.markScanCompleted();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index 55a1f7a..d538374 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -24,24 +24,21 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
/**
* An interface for the communication between SPS and Namenode module.
- *
- * @param <T>
- * is identifier of inode or full path name of inode. Internal sps will
- * use the file inodeId for the block movement. External sps will use
- * file string path representation for the block movement.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface Context<T> {
+public interface Context {
/**
* Returns true if the SPS is running, false otherwise.
@@ -85,7 +82,7 @@ public interface Context<T> {
* - file info
* @return true if the given file exists, false otherwise.
*/
- boolean isFileExist(T filePath);
+ boolean isFileExist(long filePath);
/**
* Gets the storage policy details for the given policy ID.
@@ -108,7 +105,7 @@ public interface Context<T> {
* - user invoked satisfier path
* @throws IOException
*/
- void removeSPSHint(T spsPath) throws IOException;
+ void removeSPSHint(long spsPath) throws IOException;
/**
* Gets the number of live datanodes in the cluster.
@@ -124,7 +121,7 @@ public interface Context<T> {
* file path
* @return file status metadata information
*/
- HdfsFileStatus getFileInfo(T file) throws IOException;
+ HdfsFileStatus getFileInfo(long file) throws IOException;
/**
* Returns all the live datanodes and its storage details.
@@ -137,15 +134,41 @@ public interface Context<T> {
/**
* @return next SPS path info to process.
*/
- T getNextSPSPath();
+ Long getNextSPSPath();
/**
* Removes the SPS path id.
*/
- void removeSPSPathId(T pathId);
+ void removeSPSPathId(long pathId);
/**
* Removes all SPS path ids.
*/
void removeAllSPSPathIds();
+
+ /**
+ * Do scan and collects the files under that directory and adds to the given
+ * BlockStorageMovementNeeded.
+ *
+ * @param filePath
+ * file path
+ */
+ void scanAndCollectFiles(long filePath)
+ throws IOException, InterruptedException;
+
+ /**
+ * Handles the block move tasks. BlockMovingInfo must contain the required
+ * info to move the block, that source location, destination location and
+ * storage types.
+ */
+ void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException;
+
+ /**
+ * This can be used to notify to the SPS about block movement attempt
+ * finished. Then SPS will re-check whether it needs retry or not.
+ *
+ * @param moveAttemptFinishedBlks
+ * list of movement attempt finished blocks
+ */
+ void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
index 3531ecd..d4e514b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/DatanodeCacheManager.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
* interval.
*/
@InterfaceAudience.Private
-public class DatanodeCacheManager<T> {
+public class DatanodeCacheManager {
private static final Logger LOG = LoggerFactory
.getLogger(DatanodeCacheManager.class);
@@ -78,7 +78,7 @@ public class DatanodeCacheManager<T> {
* @throws IOException
*/
public DatanodeMap getLiveDatanodeStorageReport(
- Context<T> spsContext) throws IOException {
+ Context spsContext) throws IOException {
long now = Time.monotonicNow();
long elapsedTimeMs = now - lastAccessedTime;
boolean refreshNeeded = elapsedTimeMs >= refreshIntervalMs;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
index dceb5fa..fa8b31b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
@@ -26,23 +26,18 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* An interface for scanning the directory recursively and collect files
* under the given directory.
- *
- * @param <T>
- * is identifier of inode or full path name of inode. Internal sps will
- * use the file inodeId for the block movement. External sps will use
- * file string path representation for the block movement.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface FileCollector<T> {
+public interface FileCollector {
/**
* This method can be used to scan and collects the files under that
* directory and adds to the given BlockStorageMovementNeeded.
*
- * @param filePath
- * - file path
+ * @param path
+ * - file path id
*/
- void scanAndCollectFiles(T filePath)
+ void scanAndCollectFiles(long path)
throws IOException, InterruptedException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index a77fe85..2bf4810 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
+import java.util.Arrays;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -32,6 +35,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.net.NetworkTopology;
@@ -45,20 +49,26 @@ import org.slf4j.LoggerFactory;
* movements to satisfy the storage policy.
*/
@InterfaceAudience.Private
-public class IntraSPSNameNodeContext implements Context<Long> {
+public class IntraSPSNameNodeContext implements Context {
private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class);
private final Namesystem namesystem;
private final BlockManager blockManager;
- private SPSService<Long> service;
+ private SPSService service;
+ private final FileCollector fileCollector;
+ private final BlockMoveTaskHandler blockMoveTaskHandler;
public IntraSPSNameNodeContext(Namesystem namesystem,
- BlockManager blockManager, SPSService<Long> service) {
+ BlockManager blockManager, SPSService service) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.service = service;
+ fileCollector = new IntraSPSNameNodeFileIdCollector(
+ namesystem.getFSDirectory(), service);
+ blockMoveTaskHandler = new IntraSPSNameNodeBlockMoveTaskHandler(
+ blockManager, namesystem);
}
@Override
@@ -67,17 +77,12 @@ public class IntraSPSNameNodeContext implements Context<Long> {
}
/**
- * @return object containing information regarding the file or null if file
- * not found.
+ * @return object containing information regarding the file.
*/
@Override
- public HdfsFileStatus getFileInfo(Long inodeID) throws IOException {
- String filePath = namesystem.getFilePath(inodeID);
- if (StringUtils.isBlank(filePath)) {
- LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
- return null;
- }
- return namesystem.getFileInfo(filePath, true, true);
+ public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+ Path filePath = DFSUtilClient.makePathFromFileId(inodeID);
+ return namesystem.getFileInfo(filePath.toString(), true, true);
}
@Override
@@ -93,12 +98,12 @@ public class IntraSPSNameNodeContext implements Context<Long> {
}
@Override
- public boolean isFileExist(Long inodeId) {
+ public boolean isFileExist(long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null;
}
@Override
- public void removeSPSHint(Long inodeId) throws IOException {
+ public void removeSPSHint(long inodeId) throws IOException {
this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
}
@@ -156,7 +161,7 @@ public class IntraSPSNameNodeContext implements Context<Long> {
}
@Override
- public void removeSPSPathId(Long trackId) {
+ public void removeSPSPathId(long trackId) {
blockManager.getSPSManager().removePathId(trackId);
}
@@ -164,4 +169,21 @@ public class IntraSPSNameNodeContext implements Context<Long> {
public void removeAllSPSPathIds() {
blockManager.getSPSManager().removeAllPathIds();
}
+
+ @Override
+ public void scanAndCollectFiles(long filePath)
+ throws IOException, InterruptedException {
+ fileCollector.scanAndCollectFiles(filePath);
+ }
+
+ @Override
+ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
+ blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
+ }
+
+ @Override
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+ LOG.info("Movement attempted blocks: {}",
+ Arrays.asList(moveAttemptFinishedBlks));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index 27d9e7d..ea3b96f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -35,16 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
- implements FileCollector<Long> {
+ implements FileCollector {
private int maxQueueLimitToScan;
- private final SPSService <Long> service;
+ private final SPSService service;
private int remainingCapacity = 0;
- private List<ItemInfo<Long>> currentBatch;
+ private List<ItemInfo> currentBatch;
public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
- SPSService<Long> service) {
+ SPSService service) {
super(dir);
this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt(
@@ -64,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
return false;
}
if (inode.isFile() && inode.asFile().numBlocks() != 0) {
- currentBatch.add(new ItemInfo<Long>(
+ currentBatch.add(new ItemInfo(
((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
remainingCapacity--;
}
@@ -120,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
}
@Override
- public void scanAndCollectFiles(final Long startINodeId)
+ public void scanAndCollectFiles(final long startINodeId)
throws IOException, InterruptedException {
FSDirectory fsd = getFSDirectory();
INode startInode = fsd.getInode(startINodeId);
@@ -131,7 +131,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
}
if (startInode.isFile()) {
currentBatch
- .add(new ItemInfo<Long>(startInode.getId(), startInode.getId()));
+ .add(new ItemInfo(startInode.getId(), startInode.getId()));
} else {
readLock();
// NOTE: this lock will not be held for full directory scanning. It is
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
index bd8ab92..949e3fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
@@ -21,28 +21,26 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
- * ItemInfo is a file info object for which need to satisfy the policy. For
- * internal satisfier service, it uses inode id which is Long datatype. For the
- * external satisfier service, it uses the full string representation of the
- * path.
+ * ItemInfo is a file info object for which need to satisfy the policy.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class ItemInfo<T> {
- private T startPath;
- private T file;
+public class ItemInfo {
+ private long startPathId;
+ private long fileId;
private int retryCount;
- public ItemInfo(T startPath, T file) {
- this.startPath = startPath;
- this.file = file;
+ public ItemInfo(long startPathId, long fileId) {
+ this.startPathId = startPathId;
+ this.fileId = fileId;
// set 0 when item is getting added first time in queue.
this.retryCount = 0;
}
- public ItemInfo(final T startPath, final T file, final int retryCount) {
- this.startPath = startPath;
- this.file = file;
+ public ItemInfo(final long startPathId, final long fileId,
+ final int retryCount) {
+ this.startPathId = startPathId;
+ this.fileId = fileId;
this.retryCount = retryCount;
}
@@ -50,22 +48,22 @@ public class ItemInfo<T> {
* Returns the start path of the current file. This indicates that SPS
* was invoked on this path.
*/
- public T getStartPath() {
- return startPath;
+ public long getStartPath() {
+ return startPathId;
}
/**
* Returns the file for which needs to satisfy the policy.
*/
- public T getFile() {
- return file;
+ public long getFile() {
+ return fileId;
}
/**
* Returns true if the tracking path is a directory, false otherwise.
*/
public boolean isDir() {
- return !startPath.equals(file);
+ return !(startPathId == fileId);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 5032377..86634d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -29,15 +29,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
/**
* An interface for SPSService, which exposes life cycle and processing APIs.
- *
- * @param <T>
- * is identifier of inode or full path name of inode. Internal sps will
- * use the file inodeId for the block movement. External sps will use
- * file string path representation for the block movement.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface SPSService<T> {
+public interface SPSService {
/**
* Initializes the helper services.
@@ -45,16 +40,8 @@ public interface SPSService<T> {
* @param ctxt
* - context is an helper service to provide communication channel
* between NN and SPS
- * @param fileCollector
- * - a helper service for scanning the files under a given directory
- * id
- * @param handler
- * - a helper service for moving the blocks
- * @param blkMovementListener
- * - listener to know about block movement attempt completion
*/
- void init(Context<T> ctxt, FileCollector<T> fileCollector,
- BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
+ void init(Context ctxt);
/**
* Starts the SPS service. Make sure to initialize the helper services before
@@ -94,19 +81,19 @@ public interface SPSService<T> {
* @param itemInfo
* file info object for which need to satisfy the policy
*/
- void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted);
+ void addFileToProcess(ItemInfo itemInfo, boolean scanCompleted);
/**
* Adds all the Item information(file etc) to processing queue.
*
- * @param startPath
- * - directory/file, on which SPS was called.
+ * @param startPathId
+ * - directoryId/fileId, on which SPS was called.
* @param itemInfoList
* - list of item infos
* @param scanCompleted
* - whether the scanning of directory fully done with itemInfoList
*/
- void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
+ void addAllFilesToProcess(long startPathId, List<ItemInfo> itemInfoList,
boolean scanCompleted);
/**
@@ -117,7 +104,7 @@ public interface SPSService<T> {
/**
* Clear inodeId present in the processing queue.
*/
- void clearQueue(T spsPath);
+ void clearQueue(long spsPath);
/**
* @return the configuration.
@@ -128,9 +115,9 @@ public interface SPSService<T> {
* Marks the scanning of directory if finished.
*
* @param spsPath
- * - satisfier path
+ * - satisfier path id
*/
- void markScanCompletedForPath(T spsPath);
+ void markScanCompletedForPath(long spsPath);
/**
* Given node is reporting that it received a certain movement attempt
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index cbd6001..4af6c8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -78,20 +78,19 @@ import com.google.common.base.Preconditions;
* physical block movements.
*/
@InterfaceAudience.Private
-public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
+public class StoragePolicySatisfier implements SPSService, Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
- private BlockStorageMovementNeeded<T> storageMovementNeeded;
- private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
+ private BlockStorageMovementNeeded storageMovementNeeded;
+ private BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
private int spsWorkMultiplier;
private long blockCount = 0L;
private int blockMovementMaxRetry;
- private Context<T> ctxt;
- private BlockMoveTaskHandler blockMoveTaskHandler;
+ private Context ctxt;
private final Configuration conf;
- private DatanodeCacheManager<T> dnCacheMgr;
+ private DatanodeCacheManager dnCacheMgr;
public StoragePolicySatisfier(Configuration conf) {
this.conf = conf;
@@ -137,16 +136,11 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
}
- public void init(final Context<T> context,
- final FileCollector<T> fileIDCollector,
- final BlockMoveTaskHandler blockMovementTaskHandler,
- final BlockMovementListener blockMovementListener) {
+ public void init(final Context context) {
this.ctxt = context;
- this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context,
- fileIDCollector);
- this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>(
- this, storageMovementNeeded, blockMovementListener);
- this.blockMoveTaskHandler = blockMovementTaskHandler;
+ this.storageMovementNeeded = new BlockStorageMovementNeeded(context);
+ this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
+ this, storageMovementNeeded, context);
this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
@@ -191,7 +185,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
storagePolicySatisfierThread.start();
this.storageMovementsMonitor.start();
this.storageMovementNeeded.activate();
- dnCacheMgr = new DatanodeCacheManager<T>(conf);
+ dnCacheMgr = new DatanodeCacheManager(conf);
}
@Override
@@ -259,7 +253,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
continue;
}
try {
- ItemInfo<T> itemInfo = null;
+ ItemInfo itemInfo = null;
boolean retryItem = false;
if (!ctxt.isInSafeMode()) {
itemInfo = storageMovementNeeded.get();
@@ -271,7 +265,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
continue;
}
- T trackId = itemInfo.getFile();
+ long trackId = itemInfo.getFile();
BlocksMovingAnalysis status = null;
BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock
@@ -353,7 +347,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
blockCount = 0L;
}
if (retryItem) {
- itemInfo.increRetryCount();
+ // itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
}
} catch (IOException e) {
@@ -469,7 +463,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
try {
- blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
+ ctxt.submitMoveTask(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
StorageTypeNodePair nodeStorage = new StorageTypeNodePair(
blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget());
@@ -1092,7 +1086,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
@VisibleForTesting
- public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() {
+ public BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
@@ -1109,7 +1103,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
/**
* Clear queues for given track id.
*/
- public void clearQueue(T trackId) {
+ public void clearQueue(long trackId) {
storageMovementNeeded.clearQueue(trackId);
}
@@ -1118,7 +1112,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
* attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
- final static class AttemptedItemInfo<T> extends ItemInfo<T> {
+ final static class AttemptedItemInfo extends ItemInfo {
private long lastAttemptedOrReportedTime;
private final Set<Block> blocks;
@@ -1136,7 +1130,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
* @param retryCount
* file retry count
*/
- AttemptedItemInfo(T rootId, T trackId,
+ AttemptedItemInfo(long rootId, long trackId,
long lastAttemptedOrReportedTime,
Set<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount);
@@ -1179,7 +1173,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
@Override
- public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) {
+ public void addFileToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted);
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
@@ -1188,7 +1182,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
@Override
- public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
+ public void addAllFilesToProcess(long startPath, List<ItemInfo> itemInfoList,
boolean scanCompleted) {
getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
}
@@ -1204,12 +1198,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
}
@VisibleForTesting
- public BlockStorageMovementNeeded<T> getStorageMovementQueue() {
+ public BlockStorageMovementNeeded getStorageMovementQueue() {
return storageMovementNeeded;
}
@Override
- public void markScanCompletedForPath(T inodeId) {
+ public void markScanCompletedForPath(long inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId);
}
@@ -1278,15 +1272,4 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
"It should be a positive, non-zero integer value.");
return spsWorkMultiplier;
}
-
- /**
- * Sets external listener for testing.
- *
- * @param blkMovementListener
- * block movement listener callback object
- */
- @VisibleForTesting
- void setBlockMovementListener(BlockMovementListener blkMovementListener) {
- storageMovementsMonitor.setBlockMovementListener(blkMovementListener);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
index 5ec0372..0507d6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
public class StoragePolicySatisfyManager {
private static final Logger LOG = LoggerFactory
.getLogger(StoragePolicySatisfyManager.class);
- private final StoragePolicySatisfier<Long> spsService;
+ private final StoragePolicySatisfier spsService;
private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed;
@@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager {
pathsToBeTraveresed = new LinkedList<Long>();
// instantiate SPS service by just keeps config reference and not starting
// any supporting threads.
- spsService = new StoragePolicySatisfier<Long>(conf);
+ spsService = new StoragePolicySatisfier(conf);
this.namesystem = namesystem;
this.blkMgr = blkMgr;
}
@@ -121,10 +121,7 @@ public class StoragePolicySatisfyManager {
}
// starts internal daemon service inside namenode
spsService.init(
- new IntraSPSNameNodeContext(namesystem, blkMgr, spsService),
- new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(),
- spsService),
- new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null);
+ new IntraSPSNameNodeContext(namesystem, blkMgr, spsService));
spsService.start(false, mode);
break;
case EXTERNAL:
@@ -221,13 +218,8 @@ public class StoragePolicySatisfyManager {
mode);
return;
}
- spsService.init(
- new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService),
- new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
- spsService),
- new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr,
- this.namesystem),
- null);
+ spsService.init(new IntraSPSNameNodeContext(this.namesystem, this.blkMgr,
+ spsService));
spsService.start(true, newMode);
break;
case EXTERNAL:
@@ -309,7 +301,7 @@ public class StoragePolicySatisfyManager {
/**
* @return internal SPS service instance.
*/
- public SPSService<Long> getInternalSPSService() {
+ public SPSService getInternalSPSService() {
return this.spsService;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 5ff6ffd..f80477b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -209,6 +209,6 @@ public interface NamenodeProtocol {
* by External SPS.
*/
@AtMostOnce
- String getNextSPSPath() throws IOException;
+ Long getNextSPSPath() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index f5225d2..3ea0294 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -76,11 +76,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
private final SaslDataTransferClient saslClient;
private final BlockStorageMovementTracker blkMovementTracker;
private Daemon movementTrackerThread;
- private final SPSService<String> service;
+ private final SPSService service;
private final BlockDispatcher blkDispatcher;
public ExternalSPSBlockMoveTaskHandler(Configuration conf,
- NameNodeConnector nnc, SPSService<String> spsService) {
+ NameNodeConnector nnc, SPSService spsService) {
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index 1cd4664..189bc2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -27,6 +28,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -34,10 +37,14 @@ import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeMap;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.DatanodeWithStorage;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@@ -49,17 +56,24 @@ import org.slf4j.LoggerFactory;
* SPS from Namenode state.
*/
@InterfaceAudience.Private
-public class ExternalSPSContext implements Context<String> {
- public static final Logger LOG =
- LoggerFactory.getLogger(ExternalSPSContext.class);
- private SPSService<String> service;
- private NameNodeConnector nnc = null;
- private BlockStoragePolicySuite createDefaultSuite =
+public class ExternalSPSContext implements Context {
+ public static final Logger LOG = LoggerFactory
+ .getLogger(ExternalSPSContext.class);
+ private final SPSService service;
+ private final NameNodeConnector nnc;
+ private final BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite();
+ private final FileCollector fileCollector;
+ private final BlockMoveTaskHandler externalHandler;
+ private final BlockMovementListener blkMovementListener;
- public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) {
+ public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
this.service = service;
this.nnc = nnc;
+ this.fileCollector = new ExternalSPSFilePathCollector(service);
+ this.externalHandler = new ExternalSPSBlockMoveTaskHandler(
+ service.getConf(), nnc, service);
+ this.blkMovementListener = new ExternalBlockMovementListener();
}
@Override
@@ -119,9 +133,10 @@ public class ExternalSPSContext implements Context<String> {
}
@Override
- public boolean isFileExist(String filePath) {
+ public boolean isFileExist(long path) {
+ Path filePath = DFSUtilClient.makePathFromFileId(path);
try {
- return nnc.getDistributedFileSystem().exists(new Path(filePath));
+ return nnc.getDistributedFileSystem().exists(filePath);
} catch (IllegalArgumentException | IOException e) {
LOG.warn("Exception while getting file is for the given path:{}",
filePath, e);
@@ -140,8 +155,9 @@ public class ExternalSPSContext implements Context<String> {
}
@Override
- public void removeSPSHint(String inodeId) throws IOException {
- nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId),
+ public void removeSPSHint(long inodeId) throws IOException {
+ Path filePath = DFSUtilClient.makePathFromFileId(inodeId);
+ nnc.getDistributedFileSystem().removeXAttr(filePath,
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
}
@@ -157,11 +173,12 @@ public class ExternalSPSContext implements Context<String> {
}
@Override
- public HdfsFileStatus getFileInfo(String path) throws IOException {
+ public HdfsFileStatus getFileInfo(long path) throws IOException {
HdfsLocatedFileStatus fileInfo = null;
try {
+ Path filePath = DFSUtilClient.makePathFromFileId(path);
fileInfo = nnc.getDistributedFileSystem().getClient()
- .getLocatedFileInfo(path, false);
+ .getLocatedFileInfo(filePath.toString(), false);
} catch (FileNotFoundException e) {
LOG.debug("Path:{} doesn't exists!", path, e);
}
@@ -175,7 +192,7 @@ public class ExternalSPSContext implements Context<String> {
}
@Override
- public String getNextSPSPath() {
+ public Long getNextSPSPath() {
try {
return nnc.getNNProtocolConnection().getNextSPSPath();
} catch (IOException e) {
@@ -185,7 +202,7 @@ public class ExternalSPSContext implements Context<String> {
}
@Override
- public void removeSPSPathId(String pathId) {
+ public void removeSPSPathId(long pathId) {
// We need not specifically implement for external.
}
@@ -193,4 +210,40 @@ public class ExternalSPSContext implements Context<String> {
public void removeAllSPSPathIds() {
// We need not specifically implement for external.
}
-}
+
+ @Override
+ public void scanAndCollectFiles(long path)
+ throws IOException, InterruptedException {
+ fileCollector.scanAndCollectFiles(path);
+ }
+
+ @Override
+ public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
+ externalHandler.submitMoveTask(blkMovingInfo);
+ }
+
+ @Override
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+ // External listener if it is plugged-in
+ if (blkMovementListener != null) {
+ blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
+ }
+ }
+
+ /**
+ * Its an implementation of BlockMovementListener.
+ */
+ private static class ExternalBlockMovementListener
+ implements BlockMovementListener {
+
+ private List<Block> actualBlockMovements = new ArrayList<>();
+
+ @Override
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+ for (Block block : moveAttemptFinishedBlks) {
+ actualBlockMovements.add(block);
+ }
+ LOG.info("Movement attempted blocks", actualBlockMovements);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
index 9435475..611ff65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFilePathCollector.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -41,14 +42,14 @@ import org.slf4j.LoggerFactory;
* representation.
*/
@InterfaceAudience.Private
-public class ExternalSPSFilePathCollector implements FileCollector <String>{
+public class ExternalSPSFilePathCollector implements FileCollector {
public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSFilePathCollector.class);
private DistributedFileSystem dfs;
- private SPSService<String> service;
+ private SPSService service;
private int maxQueueLimitToScan;
- public ExternalSPSFilePathCollector(SPSService<String> service) {
+ public ExternalSPSFilePathCollector(SPSService service) {
this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
@@ -72,13 +73,13 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
* Recursively scan the given path and add the file info to SPS service for
* processing.
*/
- private long processPath(String startID, String childPath) {
+ private long processPath(Long startID, String childPath) {
long pendingWorkCount = 0; // to be satisfied file counter
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
- children = dfs.getClient().listPaths(childPath, lastReturnedName,
- false);
+ children = dfs.getClient().listPaths(childPath,
+ lastReturnedName, false);
} catch (IOException e) {
LOG.warn("Failed to list directory " + childPath
+ ". Ignore the directory and continue.", e);
@@ -93,18 +94,18 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
}
for (HdfsFileStatus child : children.getPartialListing()) {
- String childFullPath = child.getFullName(childPath);
if (child.isFile()) {
- service.addFileToProcess(
- new ItemInfo<String>(startID, childFullPath), false);
+ service.addFileToProcess(new ItemInfo(startID, child.getFileId()),
+ false);
checkProcessingQueuesFree();
pendingWorkCount++; // increment to be satisfied file count
} else {
+ String childFullPathName = child.getFullName(childPath);
if (child.isDirectory()) {
- if (!childFullPath.endsWith(Path.SEPARATOR)) {
- childFullPath = childFullPath + Path.SEPARATOR;
+ if (!childFullPathName.endsWith(Path.SEPARATOR)) {
+ childFullPathName = childFullPathName + Path.SEPARATOR;
}
- pendingWorkCount += processPath(startID, childFullPath);
+ pendingWorkCount += processPath(startID, childFullPathName);
}
}
}
@@ -150,11 +151,12 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
}
@Override
- public void scanAndCollectFiles(String path) throws IOException {
+ public void scanAndCollectFiles(long pathId) throws IOException {
if (dfs == null) {
dfs = getFS(service.getConf());
}
- long pendingSatisfyItemsCount = processPath(path, path);
+ Path filePath = DFSUtilClient.makePathFromFileId(pathId);
+ long pendingSatisfyItemsCount = processPath(pathId, filePath.toString());
// Check whether the given path contains any item to be tracked
// or the no to be satisfied paths. In case of empty list, add the given
// inodeId to the 'pendingWorkForDirectory' with empty list so that later
@@ -162,10 +164,10 @@ public class ExternalSPSFilePathCollector implements FileCollector <String>{
// this path is already satisfied the storage policy.
if (pendingSatisfyItemsCount <= 0) {
LOG.debug("There is no pending items to satisfy the given path "
- + "inodeId:{}", path);
- service.addAllFilesToProcess(path, new ArrayList<>(), true);
+ + "inodeId:{}", pathId);
+ service.addAllFilesToProcess(pathId, new ArrayList<>(), true);
} else {
- service.markScanCompletedForPath(path);
+ service.markScanCompletedForPath(pathId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index 236b887..af90f0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -32,11 +31,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -68,8 +65,7 @@ public final class ExternalStoragePolicySatisfier {
HdfsConfiguration spsConf = new HdfsConfiguration();
// login with SPS keytab
secureLogin(spsConf);
- StoragePolicySatisfier<String> sps = new StoragePolicySatisfier<String>(
- spsConf);
+ StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
nnc = getNameNodeConnector(spsConf);
boolean spsRunning;
@@ -82,12 +78,7 @@ public final class ExternalStoragePolicySatisfier {
}
ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
- ExternalBlockMovementListener blkMoveListener =
- new ExternalBlockMovementListener();
- ExternalSPSBlockMoveTaskHandler externalHandler =
- new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
- sps.init(context, new ExternalSPSFilePathCollector(sps), externalHandler,
- blkMoveListener);
+ sps.init(context);
sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
if (sps != null) {
sps.join();
@@ -132,21 +123,4 @@ public final class ExternalStoragePolicySatisfier {
}
}
}
-
- /**
- * It is implementation of BlockMovementListener.
- */
- private static class ExternalBlockMovementListener
- implements BlockMovementListener {
-
- private List<Block> actualBlockMovements = new ArrayList<>();
-
- @Override
- public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
- for (Block block : moveAttemptFinishedBlks) {
- actualBlockMovements.add(block);
- }
- LOG.info("Movement attempted blocks:{}", actualBlockMovements);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
index 2acc5a8..89edfbf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/NamenodeProtocol.proto
@@ -218,7 +218,7 @@ message GetNextSPSPathRequestProto {
}
message GetNextSPSPathResponseProto {
- optional string spsPath = 1;
+ optional uint64 spsPath = 1;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0af615/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index ed1fe92..f85769f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -45,22 +45,22 @@ import org.mockito.Mockito;
*/
public class TestBlockStorageMovementAttemptedItems {
- private BlockStorageMovementAttemptedItems<Long> bsmAttemptedItems;
- private BlockStorageMovementNeeded<Long> unsatisfiedStorageMovementFiles;
+ private BlockStorageMovementAttemptedItems bsmAttemptedItems;
+ private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles;
private final int selfRetryTimeout = 500;
@Before
public void setup() throws Exception {
Configuration config = new HdfsConfiguration();
- Context<Long> ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
- SPSService<Long> sps = new StoragePolicySatisfier<Long>(config);
+ Context ctxt = Mockito.mock(IntraSPSNameNodeContext.class);
+ SPSService sps = new StoragePolicySatisfier(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
unsatisfiedStorageMovementFiles =
- new BlockStorageMovementNeeded<Long>(ctxt, null);
- bsmAttemptedItems = new BlockStorageMovementAttemptedItems<Long>(sps,
- unsatisfiedStorageMovementFiles, null);
+ new BlockStorageMovementNeeded(ctxt);
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
+ unsatisfiedStorageMovementFiles, ctxt);
}
@After
@@ -76,7 +76,7 @@ public class TestBlockStorageMovementAttemptedItems {
long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false;
while (monotonicNow() < (stopTime)) {
- ItemInfo<Long> ele = null;
+ ItemInfo ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
if (item == ele.getFile()) {
isItemFound = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org