You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/02/16 11:33:23 UTC
[2/2] hadoop git commit: HDFS-13110: [SPS]: Reduce the number of APIs
in NamenodeProtocol used by external satisfier. Contributed by Rakesh R.
HDFS-13110: [SPS]: Reduce the number of APIs in NamenodeProtocol used by external satisfier. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3634bdcc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3634bdcc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3634bdcc
Branch: refs/heads/HDFS-10285
Commit: 3634bdcc8d2f4c2a33b9275fc1b3599a74b2c58a
Parents: 4a42e7a
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Fri Feb 16 17:01:38 2018 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Fri Feb 16 17:01:38 2018 +0530
----------------------------------------------------------------------
.../NamenodeProtocolServerSideTranslatorPB.java | 46 +----
.../NamenodeProtocolTranslatorPB.java | 42 +----
.../hdfs/server/namenode/FSTreeTraverser.java | 2 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 32 +---
.../server/namenode/ReencryptionHandler.java | 2 +-
.../sps/BlockStorageMovementAttemptedItems.java | 42 +++--
.../sps/BlockStorageMovementNeeded.java | 119 +++++++------
.../hdfs/server/namenode/sps/Context.java | 55 +++---
.../hdfs/server/namenode/sps/FileCollector.java | 48 +++++
.../server/namenode/sps/FileIdCollector.java | 43 -----
.../namenode/sps/IntraSPSNameNodeContext.java | 39 ++---
.../sps/IntraSPSNameNodeFileIdCollector.java | 23 +--
.../hdfs/server/namenode/sps/ItemInfo.java | 39 +++--
.../hdfs/server/namenode/sps/SPSService.java | 32 ++--
.../namenode/sps/StoragePolicySatisfier.java | 129 +++++++++-----
.../sps/StoragePolicySatisfyManager.java | 6 +-
.../hdfs/server/protocol/NamenodeProtocol.java | 24 +--
.../sps/ExternalSPSBlockMoveTaskHandler.java | 4 +-
.../hdfs/server/sps/ExternalSPSContext.java | 60 +++----
.../server/sps/ExternalSPSFileIDCollector.java | 174 -------------------
.../sps/ExternalSPSFilePathCollector.java | 172 ++++++++++++++++++
.../sps/ExternalStoragePolicySatisfier.java | 7 +-
.../src/main/proto/NamenodeProtocol.proto | 27 +--
.../TestBlockStorageMovementAttemptedItems.java | 27 ++-
.../sps/TestStoragePolicySatisfier.java | 52 +++---
...stStoragePolicySatisfierWithStripedFile.java | 15 +-
.../sps/TestExternalStoragePolicySatisfier.java | 148 +++++++++++-----
27 files changed, 701 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
index 16794b6..5ab9c6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolServerSideTranslatorPB.java
@@ -35,16 +35,12 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@@ -266,15 +262,15 @@ public class NamenodeProtocolServerSideTranslatorPB implements
}
@Override
- public GetNextSPSPathIdResponseProto getNextSPSPathId(
- RpcController controller, GetNextSPSPathIdRequestProto request)
+ public GetNextSPSPathResponseProto getNextSPSPath(
+ RpcController controller, GetNextSPSPathRequestProto request)
throws ServiceException {
try {
- Long nextSPSPathId = impl.getNextSPSPathId();
- if (nextSPSPathId == null) {
- return GetNextSPSPathIdResponseProto.newBuilder().build();
+ String nextSPSPath = impl.getNextSPSPath();
+ if (nextSPSPath == null) {
+ return GetNextSPSPathResponseProto.newBuilder().build();
}
- return GetNextSPSPathIdResponseProto.newBuilder().setFileId(nextSPSPathId)
+ return GetNextSPSPathResponseProto.newBuilder().setSpsPath(nextSPSPath)
.build();
} catch (IOException e) {
throw new ServiceException(e);
@@ -282,17 +278,6 @@ public class NamenodeProtocolServerSideTranslatorPB implements
}
@Override
- public GetFilePathResponseProto getFilePath(RpcController controller,
- GetFilePathRequestProto request) throws ServiceException {
- try {
- return GetFilePathResponseProto.newBuilder()
- .setSrcPath(impl.getFilePath(request.getFileId())).build();
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
-
- @Override
public CheckDNSpaceResponseProto checkDNSpaceForScheduling(
RpcController controller, CheckDNSpaceRequestProto request)
throws ServiceException {
@@ -308,19 +293,4 @@ public class NamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
-
- @Override
- public HasLowRedundancyBlocksResponseProto hasLowRedundancyBlocks(
- RpcController controller, HasLowRedundancyBlocksRequestProto request)
- throws ServiceException {
- try {
- return HasLowRedundancyBlocksResponseProto.newBuilder()
- .setHasLowRedundancyBlocks(
- impl.hasLowRedundancyBlocks(request.getInodeId()))
- .build();
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 28784ce..5bc98c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -34,12 +34,10 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeys
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetFilePathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathIdResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetNextSPSPathResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.HasLowRedundancyBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsRollingUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.IsUpgradeFinalizedRequestProto;
@@ -270,24 +268,13 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
}
@Override
- public Long getNextSPSPathId() throws IOException {
- GetNextSPSPathIdRequestProto req =
- GetNextSPSPathIdRequestProto.newBuilder().build();
+ public String getNextSPSPath() throws IOException {
+ GetNextSPSPathRequestProto req =
+ GetNextSPSPathRequestProto.newBuilder().build();
try {
- GetNextSPSPathIdResponseProto nextSPSPathId =
- rpcProxy.getNextSPSPathId(NULL_CONTROLLER, req);
- return nextSPSPathId.hasFileId() ? nextSPSPathId.getFileId() : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- }
-
- @Override
- public String getFilePath(Long inodeId) throws IOException {
- GetFilePathRequestProto req =
- GetFilePathRequestProto.newBuilder().setFileId(inodeId).build();
- try {
- return rpcProxy.getFilePath(NULL_CONTROLLER, req).getSrcPath();
+ GetNextSPSPathResponseProto nextSPSPath =
+ rpcProxy.getNextSPSPath(NULL_CONTROLLER, req);
+ return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -307,17 +294,4 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
throw ProtobufHelper.getRemoteException(e);
}
}
-
- @Override
- public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
- HasLowRedundancyBlocksRequestProto req = HasLowRedundancyBlocksRequestProto
- .newBuilder().setInodeId(inodeId).build();
- try {
- return rpcProxy.hasLowRedundancyBlocks(NULL_CONTROLLER, req)
- .getHasLowRedundancyBlocks();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
index abf6a4c..badeed1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
@@ -284,7 +284,7 @@ public abstract class FSTreeTraverser {
* @throws IOException
* @throws InterruptedException
*/
- protected abstract void submitCurrentBatch(long startId)
+ protected abstract void submitCurrentBatch(Long startId)
throws IOException, InterruptedException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index c582eab..f3b016e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2530,20 +2530,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
- public String getFilePath(Long inodeId) throws IOException {
+ public String getNextSPSPath() throws IOException {
checkNNStartup();
- String operationName = "getFilePath";
- namesystem.checkSuperuserPrivilege(operationName);
- if (nn.isStandbyState()) {
- throw new StandbyException("Not supported by Standby Namenode.");
- }
- return namesystem.getFilePath(inodeId);
- }
-
- @Override
- public Long getNextSPSPathId() throws IOException {
- checkNNStartup();
- String operationName = "getNextSPSPathId";
+ String operationName = "getNextSPSPath";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
@@ -2557,7 +2546,11 @@ public class NameNodeRpcServer implements NamenodeProtocols {
+ " inside namenode, so external SPS is not allowed to fetch"
+ " the path Ids");
}
- return namesystem.getBlockManager().getSPSManager().getNextPathId();
+ Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId();
+ if (pathId == null) {
+ return null;
+ }
+ return namesystem.getFilePath(pathId);
}
@Override
@@ -2572,15 +2565,4 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return namesystem.getBlockManager().getDatanodeManager()
.verifyTargetDatanodeHasSpaceForScheduling(dn, type, estimatedSize);
}
-
- @Override
- public boolean hasLowRedundancyBlocks(long inodeId) throws IOException {
- checkNNStartup();
- String operationName = "hasLowRedundancyBlocks";
- namesystem.checkSuperuserPrivilege(operationName);
- if (nn.isStandbyState()) {
- throw new StandbyException("Not supported by Standby Namenode.");
- }
- return namesystem.getBlockManager().hasLowRedundancyBlocks(inodeId);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index 9b00519..4043687 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -701,7 +701,7 @@ public class ReencryptionHandler implements Runnable {
* @throws InterruptedException
*/
@Override
- protected void submitCurrentBatch(final long zoneId) throws IOException,
+ protected void submitCurrentBatch(final Long zoneId) throws IOException,
InterruptedException {
if (currentBatch.isEmpty()) {
return;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index ea7a093..d2f0bb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -45,8 +45,13 @@ import com.google.common.annotations.VisibleForTesting;
* entries from tracking. If there is no DN reports about movement attempt
* finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes.
+ *
+ * @param <T>
+ * is identifier of inode or full path name of inode. Internal sps will
+ * use the file inodeId for the block movement. External sps will use
+ * file string path representation for the block movement.
*/
-public class BlockStorageMovementAttemptedItems{
+public class BlockStorageMovementAttemptedItems<T> {
private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@@ -54,7 +59,7 @@ public class BlockStorageMovementAttemptedItems{
* A map holds the items which are already taken for blocks movements
* processing and sent to DNs.
*/
- private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+ private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems;
private final List<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
@@ -70,11 +75,11 @@ public class BlockStorageMovementAttemptedItems{
// a request is timed out.
//
private long minCheckTimeout = 1 * 60 * 1000; // minimum value
- private BlockStorageMovementNeeded blockStorageMovementNeeded;
- private final SPSService service;
+ private BlockStorageMovementNeeded<T> blockStorageMovementNeeded;
+ private final SPSService<T> service;
- public BlockStorageMovementAttemptedItems(SPSService service,
- BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+ public BlockStorageMovementAttemptedItems(SPSService<T> service,
+ BlockStorageMovementNeeded<T> unsatisfiedStorageMovementFiles,
BlockMovementListener blockMovementListener) {
this.service = service;
long recheckTimeout = this.service.getConf().getLong(
@@ -100,7 +105,7 @@ public class BlockStorageMovementAttemptedItems{
* @param itemInfo
* - tracking info
*/
- public void add(AttemptedItemInfo itemInfo) {
+ public void add(AttemptedItemInfo<T> itemInfo) {
synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.add(itemInfo);
}
@@ -190,25 +195,24 @@ public class BlockStorageMovementAttemptedItems{
@VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+ Iterator<AttemptedItemInfo<T>> iter = storageMovementAttemptedItems
.iterator();
long now = monotonicNow();
while (iter.hasNext()) {
- AttemptedItemInfo itemInfo = iter.next();
+ AttemptedItemInfo<T> itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
- Long blockCollectionID = itemInfo.getFileId();
+ T file = itemInfo.getFile();
synchronized (movementFinishedBlocks) {
- ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
- blockCollectionID, itemInfo.getRetryCount() + 1);
+ ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(),
+ file, itemInfo.getRetryCount() + 1);
blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
- + "retries queue for next iteration.", blockCollectionID);
+ + "retries queue for next iteration.", file);
}
}
}
-
}
}
@@ -219,17 +223,17 @@ public class BlockStorageMovementAttemptedItems{
while (finishedBlksIter.hasNext()) {
Block blk = finishedBlksIter.next();
synchronized (storageMovementAttemptedItems) {
- Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
- .iterator();
+ Iterator<AttemptedItemInfo<T>> iterator =
+ storageMovementAttemptedItems.iterator();
while (iterator.hasNext()) {
- AttemptedItemInfo attemptedItemInfo = iterator.next();
+ AttemptedItemInfo<T> attemptedItemInfo = iterator.next();
attemptedItemInfo.getBlocks().remove(blk);
if (attemptedItemInfo.getBlocks().isEmpty()) {
// TODO: try add this at front of the Queue, so that this element
// gets the chance first and can be cleaned from queue quickly as
// all movements already done.
- blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
- .getStartId(), attemptedItemInfo.getFileId(),
+ blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo
+ .getStartPath(), attemptedItemInfo.getFile(),
attemptedItemInfo.getRetryCount() + 1));
iterator.remove();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index c683a63..a194876 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -43,31 +43,36 @@ import com.google.common.annotations.VisibleForTesting;
* schedule the block collection IDs for movement. It track the info of
* scheduled items and remove the SPS xAttr from the file/Directory once
* movement is success.
+ *
+ * @param <T>
+ * is identifier of inode or full path name of inode. Internal sps will
+ * use the file inodeId for the block movement. External sps will use
+ * file string path representation for the block movement.
*/
@InterfaceAudience.Private
-public class BlockStorageMovementNeeded {
+public class BlockStorageMovementNeeded<T> {
public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementNeeded.class);
- private final Queue<ItemInfo> storageMovementNeeded =
- new LinkedList<ItemInfo>();
+ private final Queue<ItemInfo<T>> storageMovementNeeded =
+ new LinkedList<ItemInfo<T>>();
/**
- * Map of startId and number of child's. Number of child's indicate the
+ * Map of startPath and number of child's. Number of child's indicate the
* number of files pending to satisfy the policy.
*/
- private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
- new HashMap<Long, DirPendingWorkInfo>();
+ private final Map<T, DirPendingWorkInfo> pendingWorkForDirectory =
+ new HashMap<>();
- private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
+ private final Map<T, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>();
- private final Context ctxt;
+ private final Context<T> ctxt;
private Daemon pathIdCollector;
- private FileIdCollector fileIDCollector;
+ private FileCollector<T> fileCollector;
private SPSPathIdProcessor pathIDProcessor;
@@ -75,10 +80,10 @@ public class BlockStorageMovementNeeded {
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
- public BlockStorageMovementNeeded(Context context,
- FileIdCollector fileIDCollector) {
+ public BlockStorageMovementNeeded(Context<T> context,
+ FileCollector<T> fileCollector) {
this.ctxt = context;
- this.fileIDCollector = fileIDCollector;
+ this.fileCollector = fileCollector;
pathIDProcessor = new SPSPathIdProcessor();
}
@@ -89,8 +94,8 @@ public class BlockStorageMovementNeeded {
* @param trackInfo
* - track info for satisfy the policy
*/
- public synchronized void add(ItemInfo trackInfo) {
- spsStatus.put(trackInfo.getStartId(),
+ public synchronized void add(ItemInfo<T> trackInfo) {
+ spsStatus.put(trackInfo.getFile(),
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
storageMovementNeeded.add(trackInfo);
@@ -100,8 +105,8 @@ public class BlockStorageMovementNeeded {
* Add the itemInfo list to tracking list for which storage movement expected
* if necessary.
*
- * @param startId
- * - start id
+ * @param startPath
+ * - start path
* @param itemInfoList
* - List of child in the directory
* @param scanCompleted
@@ -109,10 +114,10 @@ public class BlockStorageMovementNeeded {
* scan.
*/
@VisibleForTesting
- public synchronized void addAll(long startId, List<ItemInfo> itemInfoList,
+ public synchronized void addAll(T startPath, List<ItemInfo<T>> itemInfoList,
boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
- updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted);
+ updatePendingDirScanStats(startPath, itemInfoList.size(), scanCompleted);
}
/**
@@ -126,22 +131,22 @@ public class BlockStorageMovementNeeded {
* elements to scan.
*/
@VisibleForTesting
- public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
+ public synchronized void add(ItemInfo<T> itemInfo, boolean scanCompleted) {
storageMovementNeeded.add(itemInfo);
// This represents sps start id is file, so no need to update pending dir
// stats.
- if (itemInfo.getStartId() == itemInfo.getFileId()) {
+ if (itemInfo.getStartPath() == itemInfo.getFile()) {
return;
}
- updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted);
+ updatePendingDirScanStats(itemInfo.getFile(), 1, scanCompleted);
}
- private void updatePendingDirScanStats(long startId, int numScannedFiles,
+ private void updatePendingDirScanStats(T startPath, int numScannedFiles,
boolean scanCompleted) {
- DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startPath);
if (pendingWork == null) {
pendingWork = new DirPendingWorkInfo();
- pendingWorkForDirectory.put(startId, pendingWork);
+ pendingWorkForDirectory.put(startPath, pendingWork);
}
pendingWork.addPendingWorkCount(numScannedFiles);
if (scanCompleted) {
@@ -150,12 +155,12 @@ public class BlockStorageMovementNeeded {
}
/**
- * Gets the block collection id for which storage movements check necessary
+ * Gets the satisfier files for which block storage movements check necessary
* and make the movement if required.
*
- * @return block collection ID
+ * @return satisfier files
*/
- public synchronized ItemInfo get() {
+ public synchronized ItemInfo<T> get() {
return storageMovementNeeded.poll();
}
@@ -176,12 +181,12 @@ public class BlockStorageMovementNeeded {
* Decrease the pending child count for directory once one file blocks moved
* successfully. Remove the SPS xAttr if pending child count is zero.
*/
- public synchronized void removeItemTrackInfo(ItemInfo trackInfo,
+ public synchronized void removeItemTrackInfo(ItemInfo<T> trackInfo,
boolean isSuccess) throws IOException {
if (trackInfo.isDir()) {
// If track is part of some start inode then reduce the pending
// directory work count.
- long startId = trackInfo.getStartId();
+ T startId = trackInfo.getStartPath();
if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
@@ -202,17 +207,17 @@ public class BlockStorageMovementNeeded {
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
- ctxt.removeSPSHint(trackInfo.getFileId());
- updateStatus(trackInfo.getStartId(), isSuccess);
+ ctxt.removeSPSHint(trackInfo.getFile());
+ updateStatus(trackInfo.getFile(), isSuccess);
}
}
- public synchronized void clearQueue(long trackId) {
+ public synchronized void clearQueue(T trackId) {
ctxt.removeSPSPathId(trackId);
- Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
+ Iterator<ItemInfo<T>> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
- ItemInfo next = iterator.next();
- if (next.getStartId() == trackId) {
+ ItemInfo<T> next = iterator.next();
+ if (next.getFile() == trackId) {
iterator.remove();
}
}
@@ -222,7 +227,7 @@ public class BlockStorageMovementNeeded {
/**
* Mark inode status as SUCCESS in map.
*/
- private void updateStatus(long startId, boolean isSuccess){
+ private void updateStatus(T startId, boolean isSuccess){
StoragePolicySatisfyPathStatusInfo spsStatusInfo =
spsStatus.get(startId);
if (spsStatusInfo == null) {
@@ -244,8 +249,8 @@ public class BlockStorageMovementNeeded {
*/
public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories
- Long trackId;
- while ((trackId = ctxt.getNextSPSPathId()) != null) {
+ T trackId;
+ while ((trackId = ctxt.getNextSPSPath()) != null) {
try {
// Remove xAttr for file
ctxt.removeSPSHint(trackId);
@@ -256,17 +261,17 @@ public class BlockStorageMovementNeeded {
// File's directly added to storageMovementNeeded, So try to remove
// xAttr for file
- ItemInfo itemInfo;
+ ItemInfo<T> itemInfo;
while ((itemInfo = get()) != null) {
try {
// Remove xAttr for file
if (!itemInfo.isDir()) {
- ctxt.removeSPSHint(itemInfo.getFileId());
+ ctxt.removeSPSHint(itemInfo.getFile());
}
} catch (IOException ie) {
LOG.warn(
"Failed to remove SPS xattr for track id "
- + itemInfo.getFileId(), ie);
+ + itemInfo.getFile(), ie);
}
}
this.clearAll();
@@ -282,29 +287,29 @@ public class BlockStorageMovementNeeded {
public void run() {
LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0;
- Long startINodeId = null;
+ T startINode = null;
while (ctxt.isRunning()) {
try {
if (!ctxt.isInSafeMode()) {
- if (startINodeId == null) {
- startINodeId = ctxt.getNextSPSPathId();
+ if (startINode == null) {
+ startINode = ctxt.getNextSPSPath();
} // else same id will be retried
- if (startINodeId == null) {
+ if (startINode == null) {
// Waiting for SPS path
Thread.sleep(3000);
} else {
- spsStatus.put(startINodeId,
+ spsStatus.put(startINode,
new StoragePolicySatisfyPathStatusInfo(
StoragePolicySatisfyPathStatus.IN_PROGRESS));
- fileIDCollector.scanAndCollectFileIds(startINodeId);
+ fileCollector.scanAndCollectFiles(startINode);
// check if directory was empty and no child added to queue
DirPendingWorkInfo dirPendingWorkInfo =
- pendingWorkForDirectory.get(startINodeId);
+ pendingWorkForDirectory.get(startINode);
if (dirPendingWorkInfo != null
&& dirPendingWorkInfo.isDirWorkDone()) {
- ctxt.removeSPSHint(startINodeId);
- pendingWorkForDirectory.remove(startINodeId);
- updateStatus(startINodeId, true);
+ ctxt.removeSPSHint(startINode);
+ pendingWorkForDirectory.remove(startINode);
+ updateStatus(startINode, true);
}
}
//Clear the SPS status if status is in SUCCESS more than 5 min.
@@ -313,7 +318,7 @@ public class BlockStorageMovementNeeded {
lastStatusCleanTime = Time.monotonicNow();
cleanSPSStatus();
}
- startINodeId = null; // Current inode id successfully scanned.
+ startINode = null; // Current inode successfully scanned.
}
} catch (Throwable t) {
String reClass = t.getClass().getName();
@@ -334,9 +339,9 @@ public class BlockStorageMovementNeeded {
}
private synchronized void cleanSPSStatus() {
- for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
- spsStatus.entrySet().iterator(); it.hasNext();) {
- Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
+ for (Iterator<Entry<T, StoragePolicySatisfyPathStatusInfo>> it = spsStatus
+ .entrySet().iterator(); it.hasNext();) {
+ Entry<T, StoragePolicySatisfyPathStatusInfo> entry = it.next();
if (entry.getValue().canRemove()) {
it.remove();
}
@@ -472,8 +477,8 @@ public class BlockStorageMovementNeeded {
return statusClearanceElapsedTimeMs;
}
- public void markScanCompletedForDir(Long inodeId) {
- DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId);
+ public void markScanCompletedForDir(T inode) {
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inode);
if (pendingWork != null) {
pendingWork.markScanCompleted();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index ff4ad6b..84a969d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -33,11 +33,16 @@ import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
/**
- * An interface for the communication between NameNode and SPS module.
+ * An interface for the communication between SPS and Namenode module.
+ *
+ * @param <T>
+ * is identifier of inode or full path name of inode. Internal sps will
+ * use the file inodeId for the block movement. External sps will use
+ * file string path representation for the block movement.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface Context {
+public interface Context<T> {
/**
* Returns true if the SPS is running, false otherwise.
@@ -72,13 +77,13 @@ public interface Context {
NetworkTopology getNetworkTopology();
/**
- * Returns true if the give Inode exists in the Namespace.
+ * Returns true if the give file exists in the Namespace.
*
- * @param inodeId
- * - Inode ID
- * @return true if Inode exists, false otherwise.
+ * @param filePath
+ * - file info
+ * @return true if the given file exists, false otherwise.
*/
- boolean isFileExist(long inodeId);
+ boolean isFileExist(T filePath);
/**
* Gets the storage policy details for the given policy ID.
@@ -97,11 +102,11 @@ public interface Context {
/**
* Remove the hint which was added to track SPS call.
*
- * @param inodeId
- * - Inode ID
+ * @param spsPath
+ * - user invoked satisfier path
* @throws IOException
*/
- void removeSPSHint(long inodeId) throws IOException;
+ void removeSPSHint(T spsPath) throws IOException;
/**
* Gets the number of live datanodes in the cluster.
@@ -113,11 +118,11 @@ public interface Context {
/**
* Get the file info for a specific file.
*
- * @param inodeID
- * inode identifier
+ * @param file
+ * file path
* @return file status metadata information
*/
- HdfsFileStatus getFileInfo(long inodeID) throws IOException;
+ HdfsFileStatus getFileInfo(T file) throws IOException;
/**
* Returns all the live datanodes and its storage details.
@@ -128,15 +133,6 @@ public interface Context {
throws IOException;
/**
- * Returns true if the given inode file has low redundancy blocks.
- *
- * @param inodeID
- * inode identifier
- * @return true if block collection has low redundancy blocks
- */
- boolean hasLowRedundancyBlocks(long inodeID);
-
- /**
* Checks whether the given datanode has sufficient space to occupy the given
* blockSize data.
*
@@ -153,26 +149,17 @@ public interface Context {
long blockSize);
/**
- * @return next SPS path id to process.
+ * @return next SPS path info to process.
*/
- Long getNextSPSPathId();
+ T getNextSPSPath();
/**
* Removes the SPS path id.
*/
- void removeSPSPathId(long pathId);
+ void removeSPSPathId(T pathId);
/**
* Removes all SPS path ids.
*/
void removeAllSPSPathIds();
-
- /**
- * Gets the file path for a given inode id.
- *
- * @param inodeId
- * - path inode id.
- */
- String getFilePath(Long inodeId);
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
new file mode 100644
index 0000000..dceb5fa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileCollector.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for scanning the directory recursively and collect files
+ * under the given directory.
+ *
+ * @param <T>
+ * is identifier of inode or full path name of inode. Internal sps will
+ * use the file inodeId for the block movement. External sps will use
+ * file string path representation for the block movement.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FileCollector<T> {
+
+ /**
+ * This method can be used to scan and collects the files under that
+ * directory and adds to the given BlockStorageMovementNeeded.
+ *
+ * @param filePath
+ * - file path
+ */
+ void scanAndCollectFiles(T filePath)
+ throws IOException, InterruptedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
deleted file mode 100644
index 7cf77f0..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.namenode.sps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * An interface for scanning the directory recursively and collect file ids
- * under the given directory.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface FileIdCollector {
-
- /**
- * Scans the given inode directory and collects the file ids under that
- * directory and adds to the given BlockStorageMovementNeeded.
- *
- * @param inodeID
- * - The directory ID
- */
- void scanAndCollectFileIds(Long inodeId)
- throws IOException, InterruptedException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index 495d1c4..f6b6d95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -47,17 +47,17 @@ import org.slf4j.LoggerFactory;
* movements to satisfy the storage policy.
*/
@InterfaceAudience.Private
-public class IntraSPSNameNodeContext implements Context {
+public class IntraSPSNameNodeContext implements Context<Long> {
private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class);
private final Namesystem namesystem;
private final BlockManager blockManager;
- private SPSService service;
+ private SPSService<Long> service;
public IntraSPSNameNodeContext(Namesystem namesystem,
- BlockManager blockManager, SPSService service) {
+ BlockManager blockManager, SPSService<Long> service) {
this.namesystem = namesystem;
this.blockManager = blockManager;
this.service = service;
@@ -68,20 +68,18 @@ public class IntraSPSNameNodeContext implements Context {
return blockManager.getDatanodeManager().getNumLiveDataNodes();
}
+ /**
+ * @return object containing information regarding the file or null if file
+ * not found.
+ */
@Override
- public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+ public HdfsFileStatus getFileInfo(Long inodeID) throws IOException {
String filePath = namesystem.getFilePath(inodeID);
if (StringUtils.isBlank(filePath)) {
LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
return null;
}
- HdfsFileStatus fileInfo = null;
- try {
- fileInfo = namesystem.getFileInfo(filePath, true, true);
- } catch (IOException e) {
- LOG.debug("File path:{} doesn't exists!", filePath);
- }
- return fileInfo;
+ return namesystem.getFileInfo(filePath, true, true);
}
@Override
@@ -97,17 +95,12 @@ public class IntraSPSNameNodeContext implements Context {
}
@Override
- public boolean hasLowRedundancyBlocks(long inodeId) {
- return blockManager.hasLowRedundancyBlocks(inodeId);
- }
-
- @Override
- public boolean isFileExist(long inodeId) {
+ public boolean isFileExist(Long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null;
}
@Override
- public void removeSPSHint(long inodeId) throws IOException {
+ public void removeSPSHint(Long inodeId) throws IOException {
this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
}
@@ -177,12 +170,12 @@ public class IntraSPSNameNodeContext implements Context {
}
@Override
- public Long getNextSPSPathId() {
+ public Long getNextSPSPath() {
return blockManager.getSPSManager().getNextPathId();
}
@Override
- public void removeSPSPathId(long trackId) {
+ public void removeSPSPathId(Long trackId) {
blockManager.getSPSManager().removePathId(trackId);
}
@@ -190,10 +183,4 @@ public class IntraSPSNameNodeContext implements Context {
public void removeAllSPSPathIds() {
blockManager.getSPSManager().removeAllPathIds();
}
-
- @Override
- public String getFilePath(Long inodeId) {
- return namesystem.getFilePath(inodeId);
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index 7a44dd9..27d9e7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -35,15 +35,16 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
*/
@InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
- implements FileIdCollector {
+ implements FileCollector<Long> {
private int maxQueueLimitToScan;
- private final SPSService service;
+ private final SPSService <Long> service;
private int remainingCapacity = 0;
- private List<ItemInfo> currentBatch;
+ private List<ItemInfo<Long>> currentBatch;
- public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) {
+ public IntraSPSNameNodeFileIdCollector(FSDirectory dir,
+ SPSService<Long> service) {
super(dir);
this.service = service;
this.maxQueueLimitToScan = service.getConf().getInt(
@@ -63,7 +64,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
return false;
}
if (inode.isFile() && inode.asFile().numBlocks() != 0) {
- currentBatch.add(new ItemInfo(
+ currentBatch.add(new ItemInfo<Long>(
((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
remainingCapacity--;
}
@@ -83,10 +84,10 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
}
@Override
- protected void submitCurrentBatch(long startId)
+ protected void submitCurrentBatch(Long startId)
throws IOException, InterruptedException {
// Add current child's to queue
- service.addAllFileIdsToProcess(startId,
+ service.addAllFilesToProcess(startId,
currentBatch, false);
currentBatch.clear();
}
@@ -119,7 +120,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
}
@Override
- public void scanAndCollectFileIds(final Long startINodeId)
+ public void scanAndCollectFiles(final Long startINodeId)
throws IOException, InterruptedException {
FSDirectory fsd = getFSDirectory();
INode startInode = fsd.getInode(startINodeId);
@@ -129,9 +130,9 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
throttle();
}
if (startInode.isFile()) {
- currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId()));
+ currentBatch
+ .add(new ItemInfo<Long>(startInode.getId(), startInode.getId()));
} else {
-
readLock();
// NOTE: this lock will not be held for full directory scanning. It is
// basically a sliced locking. Once it collects a batch size( at max the
@@ -148,7 +149,7 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
}
}
// Mark startInode traverse is done, this is last-batch
- service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true);
+ service.addAllFilesToProcess(startInode.getId(), currentBatch, true);
currentBatch.clear();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
index 47c64cc..bd8ab92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
@@ -21,48 +21,51 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
- * ItemInfo is a file info object for which need to satisfy the policy.
+ * ItemInfo is a file info object for which need to satisfy the policy. For
+ * internal satisfier service, it uses inode id which is Long datatype. For the
+ * external satisfier service, it uses the full string representation of the
+ * path.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class ItemInfo {
- private long startId;
- private long fileId;
+public class ItemInfo<T> {
+ private T startPath;
+ private T file;
private int retryCount;
- public ItemInfo(long startId, long fileId) {
- this.startId = startId;
- this.fileId = fileId;
+ public ItemInfo(T startPath, T file) {
+ this.startPath = startPath;
+ this.file = file;
// set 0 when item is getting added first time in queue.
this.retryCount = 0;
}
- public ItemInfo(final long startId, final long fileId, final int retryCount) {
- this.startId = startId;
- this.fileId = fileId;
+ public ItemInfo(final T startPath, final T file, final int retryCount) {
+ this.startPath = startPath;
+ this.file = file;
this.retryCount = retryCount;
}
/**
- * Return the start inode id of the current track Id. This indicates that SPS
- * was invoked on this inode id.
+ * Returns the start path of the current file. This indicates that SPS
+ * was invoked on this path.
*/
- public long getStartId() {
- return startId;
+ public T getStartPath() {
+ return startPath;
}
/**
- * Return the File inode Id for which needs to satisfy the policy.
+ * Returns the file for which needs to satisfy the policy.
*/
- public long getFileId() {
- return fileId;
+ public T getFile() {
+ return file;
}
/**
* Returns true if the tracking path is a directory, false otherwise.
*/
public boolean isDir() {
- return (startId != fileId);
+ return !startPath.equals(file);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index da6e365..71d8fd1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -27,10 +27,15 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
/**
* An interface for SPSService, which exposes life cycle and processing APIs.
+ *
+ * @param <T>
+ * is identifier of inode or full path name of inode. Internal sps will
+ * use the file inodeId for the block movement. External sps will use
+ * file string path representation for the block movement.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public interface SPSService {
+public interface SPSService<T> {
/**
* Initializes the helper services.
@@ -38,7 +43,7 @@ public interface SPSService {
* @param ctxt
* - context is an helper service to provide communication channel
* between NN and SPS
- * @param fileIDCollector
+ * @param fileCollector
* - a helper service for scanning the files under a given directory
* id
* @param handler
@@ -46,7 +51,7 @@ public interface SPSService {
* @param blkMovementListener
* - listener to know about block movement attempt completion
*/
- void init(Context ctxt, FileIdCollector fileIDCollector,
+ void init(Context<T> ctxt, FileCollector<T> fileCollector,
BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
/**
@@ -82,23 +87,24 @@ public interface SPSService {
boolean isRunning();
/**
- * Adds the Item information(file id etc) to processing queue.
+ * Adds the Item information(file etc) to processing queue.
*
* @param itemInfo
+ * file info object for which need to satisfy the policy
*/
- void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted);
+ void addFileToProcess(ItemInfo<T> itemInfo, boolean scanCompleted);
/**
- * Adds all the Item information(file id etc) to processing queue.
+ * Adds all the Item information(file etc) to processing queue.
*
- * @param startId
- * - directory/file id, on which SPS was called.
+ * @param startPath
+ * - directory/file, on which SPS was called.
* @param itemInfoList
* - list of item infos
* @param scanCompleted
* - whether the scanning of directory fully done with itemInfoList
*/
- void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+ void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
boolean scanCompleted);
/**
@@ -109,7 +115,7 @@ public interface SPSService {
/**
* Clear inodeId present in the processing queue.
*/
- void clearQueue(long inodeId);
+ void clearQueue(T spsPath);
/**
* @return the configuration.
@@ -119,10 +125,10 @@ public interface SPSService {
/**
* Marks the scanning of directory if finished.
*
- * @param inodeId
- * - directory inode id.
+ * @param spsPath
+ * - satisfier path
*/
- void markScanCompletedForPath(Long inodeId);
+ void markScanCompletedForPath(T spsPath);
/**
* Notify the details of storage movement attempt finished blocks.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 6b449aa..08a26e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -66,7 +66,7 @@ import com.google.common.base.Preconditions;
* storage policy type in Namespace, but physical block storage movement will
* not happen until user runs "Mover Tool" explicitly for such files. The
* StoragePolicySatisfier Daemon thread implemented for addressing the case
- * where users may want to physically move the blocks by a dedidated daemon (can
+ * where users may want to physically move the blocks by a dedicated daemon (can
* run inside Namenode or stand alone) instead of running mover tool explicitly.
* Just calling client API to satisfyStoragePolicy on a file/dir will
* automatically trigger to move its physical storage locations as expected in
@@ -77,19 +77,19 @@ import com.google.common.base.Preconditions;
* physical block movements.
*/
@InterfaceAudience.Private
-public class StoragePolicySatisfier implements SPSService, Runnable {
+public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
- private BlockStorageMovementNeeded storageMovementNeeded;
- private BlockStorageMovementAttemptedItems storageMovementsMonitor;
+ private BlockStorageMovementNeeded<T> storageMovementNeeded;
+ private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor;
private volatile boolean isRunning = false;
private volatile StoragePolicySatisfierMode spsMode =
StoragePolicySatisfierMode.NONE;
private int spsWorkMultiplier;
private long blockCount = 0L;
private int blockMovementMaxRetry;
- private Context ctxt;
+ private Context<T> ctxt;
private BlockMoveTaskHandler blockMoveTaskHandler;
private final Configuration conf;
@@ -135,15 +135,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
}
- public void init(final Context context, final FileIdCollector fileIDCollector,
+ public void init(final Context<T> context,
+ final FileCollector<T> fileIDCollector,
final BlockMoveTaskHandler blockMovementTaskHandler,
final BlockMovementListener blockMovementListener) {
this.ctxt = context;
- this.storageMovementNeeded =
- new BlockStorageMovementNeeded(context, fileIDCollector);
- this.storageMovementsMonitor =
- new BlockStorageMovementAttemptedItems(this,
- storageMovementNeeded, blockMovementListener);
+ this.storageMovementNeeded = new BlockStorageMovementNeeded<T>(context,
+ fileIDCollector);
+ this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems<T>(
+ this, storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler;
this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
@@ -257,24 +257,24 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
continue;
}
try {
+ ItemInfo<T> itemInfo = null;
+ boolean retryItem = false;
if (!ctxt.isInSafeMode()) {
- ItemInfo itemInfo = storageMovementNeeded.get();
+ itemInfo = storageMovementNeeded.get();
if (itemInfo != null) {
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
LOG.info("Failed to satisfy the policy after "
+ blockMovementMaxRetry + " retries. Removing inode "
- + itemInfo.getFileId() + " from the queue");
+ + itemInfo.getFile() + " from the queue");
storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
continue;
}
- long trackId = itemInfo.getFileId();
+ T trackId = itemInfo.getFile();
BlocksMovingAnalysis status = null;
DatanodeStorageReport[] liveDnReports;
BlockStoragePolicy existingStoragePolicy;
// TODO: presently, context internally acquire the lock
// and returns the result. Need to discuss to move the lock outside?
- boolean hasLowRedundancyBlocks = ctxt
- .hasLowRedundancyBlocks(trackId);
HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
// Check path existence.
if (fileStatus == null || fileStatus.isDir()) {
@@ -289,7 +289,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
status = analyseBlocksStorageMovementsAndAssignToDN(file,
- hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports);
+ existingStoragePolicy, liveDnReports);
switch (status.status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
@@ -302,8 +302,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ "movement attempt finished report",
status.status, fileStatus.getPath());
}
- this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
- .getStartId(), itemInfo.getFileId(), monotonicNow(),
+ this.storageMovementsMonitor.add(new AttemptedItemInfo<T>(
+ itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(),
status.assignedBlocks, itemInfo.getRetryCount()));
break;
case NO_BLOCKS_TARGETS_PAIRED:
@@ -312,8 +312,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ " retry queue as none of the blocks found its eligible"
+ " targets.", trackId, fileStatus.getPath());
}
- itemInfo.increRetryCount();
- this.storageMovementNeeded.add(itemInfo);
+ retryItem = true;
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
@@ -321,8 +320,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ "retry queue as some of the blocks are low redundant.",
trackId, fileStatus.getPath());
}
- itemInfo.increRetryCount();
- this.storageMovementNeeded.add(itemInfo);
+ retryItem = true;
break;
case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) {
@@ -330,7 +328,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ "retry queue as some of the blocks movement failed.",
trackId, fileStatus.getPath());
}
- this.storageMovementNeeded.add(itemInfo);
+ retryItem = true;
break;
// Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED:
@@ -354,6 +352,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
Thread.sleep(3000);
blockCount = 0L;
}
+ if (retryItem) {
+ itemInfo.increRetryCount();
+ this.storageMovementNeeded.add(itemInfo);
+ }
} catch (IOException e) {
LOG.error("Exception during StoragePolicySatisfier execution - "
+ "will continue next cycle", e);
@@ -377,7 +379,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
- HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks,
+ HdfsLocatedFileStatus fileInfo,
BlockStoragePolicy existingStoragePolicy,
DatanodeStorageReport[] liveDns) {
BlocksMovingAnalysis.Status status =
@@ -403,9 +405,17 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
new ArrayList<>());
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
-
+ boolean hasLowRedundancyBlocks = false;
+ int replication = fileInfo.getReplication();
for (int i = 0; i < blocks.size(); i++) {
LocatedBlock blockInfo = blocks.get(i);
+
+ // Block is considered as low redundancy when the block locations array
+ // length is less than expected replication factor. If any of the block is
+ // low redundant, then hasLowRedundancyBlocks will be marked as true.
+ hasLowRedundancyBlocks |= isLowRedundancyBlock(blockInfo, replication,
+ ecPolicy);
+
List<StorageType> expectedStorageTypes;
if (blockInfo.isStriped()) {
if (ErasureCodingPolicyManager
@@ -446,13 +456,15 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
// policy.
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
- } else if (hasLowRedundancyBlocks
- && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
- // Check if the previous block was successfully paired.
- status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
}
+ // If there is no block paired and few blocks are low redundant, so marking
+ // the status as FEW_LOW_REDUNDANCY_BLOCKS.
+ if (hasLowRedundancyBlocks
+ && status == BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED) {
+ status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
+ }
List<Block> assignedBlockIds = new ArrayList<Block>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
@@ -471,6 +483,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
/**
+ * The given block is considered as low redundancy when the block locations
+ * length is less than expected replication factor. For EC blocks, redundancy
+ * is the summation of data + parity blocks.
+ *
+ * @param blockInfo
+ * block
+ * @param replication
+ * replication factor of the given file block
+ * @param ecPolicy
+ * erasure coding policy of the given file block
+ * @return true if the given block is low redundant.
+ */
+ private boolean isLowRedundancyBlock(LocatedBlock blockInfo, int replication,
+ ErasureCodingPolicy ecPolicy) {
+ boolean hasLowRedundancyBlock = false;
+ if (blockInfo.isStriped()) {
+ // For EC blocks, redundancy is the summation of data + parity blocks.
+ replication = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits();
+ }
+ // block is considered as low redundancy when the block locations length is
+ // less than expected replication factor.
+ hasLowRedundancyBlock = blockInfo.getLocations().length < replication ? true
+ : false;
+ return hasLowRedundancyBlock;
+ }
+
+ /**
* Compute the list of block moving information corresponding to the given
* blockId. This will check that each block location of the given block is
* satisfying the expected storage policy. If block location is not satisfied
@@ -863,7 +902,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@VisibleForTesting
- BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+ public BlockStorageMovementAttemptedItems<T> getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
@@ -880,7 +919,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
/**
* Clear queues for given track id.
*/
- public void clearQueue(long trackId) {
+ public void clearQueue(T trackId) {
storageMovementNeeded.clearQueue(trackId);
}
@@ -889,7 +928,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
*/
- final static class AttemptedItemInfo extends ItemInfo {
+ final static class AttemptedItemInfo<T> extends ItemInfo<T> {
private long lastAttemptedOrReportedTime;
private final List<Block> blocks;
@@ -903,7 +942,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* @param lastAttemptedOrReportedTime
* last attempted or reported time
*/
- AttemptedItemInfo(long rootId, long trackId,
+ AttemptedItemInfo(T rootId, T trackId,
long lastAttemptedOrReportedTime,
List<Block> blocks, int retryCount) {
super(rootId, trackId, retryCount);
@@ -932,24 +971,33 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
+ /**
+ * Returns sps invoked path status. This method is used by internal satisfy
+ * storage policy service.
+ *
+ * @param path
+ * sps path
+ * @return storage policy satisfy path status
+ * @throws IOException
+ */
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
return storageMovementNeeded.getStatus(ctxt.getFileID(path));
}
@Override
- public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
+ public void addFileToProcess(ItemInfo<T> trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted);
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
- + "storageMovementNeeded queue", trackInfo.getFileId());
+ + "storageMovementNeeded queue", trackInfo.getFile());
}
}
@Override
- public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+ public void addAllFilesToProcess(T startPath, List<ItemInfo<T>> itemInfoList,
boolean scanCompleted) {
- getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted);
+ getStorageMovementQueue().addAll(startPath, itemInfoList, scanCompleted);
}
@Override
@@ -963,12 +1011,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@VisibleForTesting
- public BlockStorageMovementNeeded getStorageMovementQueue() {
+ public BlockStorageMovementNeeded<T> getStorageMovementQueue() {
return storageMovementNeeded;
}
@Override
- public void markScanCompletedForPath(Long inodeId) {
+ public void markScanCompletedForPath(T inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId);
}
@@ -976,7 +1024,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* Join main SPS thread.
*/
public void join() throws InterruptedException {
- //TODO Add join here on SPS rpc server also
storagePolicySatisfierThread.join();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
index 5bdf6ae..5ec0372 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
public class StoragePolicySatisfyManager {
private static final Logger LOG = LoggerFactory
.getLogger(StoragePolicySatisfyManager.class);
- private final StoragePolicySatisfier spsService;
+ private final StoragePolicySatisfier<Long> spsService;
private final boolean storagePolicyEnabled;
private volatile StoragePolicySatisfierMode mode;
private final Queue<Long> pathsToBeTraveresed;
@@ -84,7 +84,7 @@ public class StoragePolicySatisfyManager {
pathsToBeTraveresed = new LinkedList<Long>();
// instantiate SPS service by just keeps config reference and not starting
// any supporting threads.
- spsService = new StoragePolicySatisfier(conf);
+ spsService = new StoragePolicySatisfier<Long>(conf);
this.namesystem = namesystem;
this.blkMgr = blkMgr;
}
@@ -309,7 +309,7 @@ public class StoragePolicySatisfyManager {
/**
* @return internal SPS service instance.
*/
- public SPSService getInternalSPSService() {
+ public SPSService<Long> getInternalSPSService() {
return this.spsService;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
index 4e37c46..f5c0161 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
@@ -205,21 +205,11 @@ public interface NamenodeProtocol {
boolean isRollingUpgrade() throws IOException;
/**
- * Gets the file path for the given file id. This API used by External SPS.
- *
- * @param inodeId
- * - file inode id.
- * @return path
- */
- @Idempotent
- String getFilePath(Long inodeId) throws IOException;
-
- /**
- * @return Gets the next available sps path id, otherwise null. This API used
+ * @return Gets the next available sps path, otherwise null. This API used
* by External SPS.
*/
@AtMostOnce
- Long getNextSPSPathId() throws IOException;
+ String getNextSPSPath() throws IOException;
/**
* Verifies whether the given Datanode has the enough estimated size with
@@ -235,15 +225,5 @@ public interface NamenodeProtocol {
@Idempotent
boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) throws IOException;
-
- /**
- * Check if any low redundancy blocks for given file id. This API used by
- * External SPS.
- *
- * @param inodeID
- * - inode id.
- */
- @Idempotent
- boolean hasLowRedundancyBlocks(long inodeID) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index 4a762649..7580ba9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -81,11 +81,11 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
private final SaslDataTransferClient saslClient;
private final BlockStorageMovementTracker blkMovementTracker;
private Daemon movementTrackerThread;
- private final SPSService service;
+ private final SPSService<String> service;
private final BlockDispatcher blkDispatcher;
public ExternalSPSBlockMoveTaskHandler(Configuration conf,
- NameNodeConnector nnc, SPSService spsService) {
+ NameNodeConnector nnc, SPSService<String> spsService) {
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3634bdcc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index c309209..5d0aee6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.sps;
+import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -46,15 +48,15 @@ import org.slf4j.LoggerFactory;
* SPS from Namenode state.
*/
@InterfaceAudience.Private
-public class ExternalSPSContext implements Context {
+public class ExternalSPSContext implements Context<String> {
public static final Logger LOG =
LoggerFactory.getLogger(ExternalSPSContext.class);
- private SPSService service;
+ private SPSService<String> service;
private NameNodeConnector nnc = null;
private BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite();
- public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
+ public ExternalSPSContext(SPSService<String> service, NameNodeConnector nnc) {
this.service = service;
this.nnc = nnc;
}
@@ -110,14 +112,12 @@ public class ExternalSPSContext implements Context {
}
@Override
- public boolean isFileExist(long inodeId) {
- String filePath = null;
+ public boolean isFileExist(String filePath) {
try {
- filePath = getFilePath(inodeId);
return nnc.getDistributedFileSystem().exists(new Path(filePath));
} catch (IllegalArgumentException | IOException e) {
- LOG.warn("Exception while getting file is for the given path:{} "
- + "and fileId:{}", filePath, inodeId, e);
+ LOG.warn("Exception while getting file is for the given path:{}",
+ filePath, e);
}
return false;
}
@@ -133,8 +133,8 @@ public class ExternalSPSContext implements Context {
}
@Override
- public void removeSPSHint(long inodeId) throws IOException {
- nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
+ public void removeSPSHint(String inodeId) throws IOException {
+ nnc.getDistributedFileSystem().removeXAttr(new Path(inodeId),
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
}
@@ -150,9 +150,15 @@ public class ExternalSPSContext implements Context {
}
@Override
- public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
- return nnc.getDistributedFileSystem().getClient()
- .getLocatedFileInfo(getFilePath(inodeID), false);
+ public HdfsFileStatus getFileInfo(String path) throws IOException {
+ HdfsLocatedFileStatus fileInfo = null;
+ try {
+ fileInfo = nnc.getDistributedFileSystem().getClient()
+ .getLocatedFileInfo(path, false);
+ } catch (FileNotFoundException e) {
+ LOG.debug("Path:{} doesn't exists!", path, e);
+ }
+ return fileInfo;
}
@Override
@@ -162,17 +168,6 @@ public class ExternalSPSContext implements Context {
}
@Override
- public boolean hasLowRedundancyBlocks(long inodeID) {
- try {
- return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
- } catch (IOException e) {
- LOG.warn("Failed to check whether fileid:{} has low redundancy blocks.",
- inodeID, e);
- return false;
- }
- }
-
- @Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) {
// TODO: Instead of calling namenode for checking the available space, it
@@ -190,9 +185,9 @@ public class ExternalSPSContext implements Context {
}
@Override
- public Long getNextSPSPathId() {
+ public String getNextSPSPath() {
try {
- return nnc.getNNProtocolConnection().getNextSPSPathId();
+ return nnc.getNNProtocolConnection().getNextSPSPath();
} catch (IOException e) {
LOG.warn("Exception while getting next sps path id from Namenode.", e);
return null;
@@ -200,7 +195,7 @@ public class ExternalSPSContext implements Context {
}
@Override
- public void removeSPSPathId(long pathId) {
+ public void removeSPSPathId(String pathId) {
// We need not specifically implement for external.
}
@@ -208,15 +203,4 @@ public class ExternalSPSContext implements Context {
public void removeAllSPSPathIds() {
// We need not specifically implement for external.
}
-
- @Override
- public String getFilePath(Long inodeId) {
- try {
- return nnc.getNNProtocolConnection().getFilePath(inodeId);
- } catch (IOException e) {
- LOG.warn("Exception while getting file path id:{} from Namenode.",
- inodeId, e);
- return null;
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org