You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/24 07:32:41 UTC
[46/50] [abbrv] hadoop git commit: HDFS-12982 : [SPS]: Reduce the
locking and cleanup the Namesystem access. Contributed by Rakesh R.
HDFS-12982 : [SPS]: Reduce the locking and cleanup the Namesystem access. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcce97f8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcce97f8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcce97f8
Branch: refs/heads/HDFS-10285
Commit: bcce97f87a9aeb2cf20acef0cc3d22dab1f5120c
Parents: 343d9cbd
Author: Surendra Singh Lilhore <su...@apache.org>
Authored: Mon Jan 8 15:13:11 2018 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Wed Jan 24 12:27:17 2018 +0530
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 16 +-
.../blockmanagement/DatanodeDescriptor.java | 2 +-
.../server/blockmanagement/DatanodeManager.java | 22 ++
.../server/namenode/FSDirStatAndListingOp.java | 1 +
.../hdfs/server/namenode/FSNamesystem.java | 44 ++-
.../hdfs/server/namenode/IntraNNSPSContext.java | 41 --
.../hadoop/hdfs/server/namenode/Namesystem.java | 24 ++
.../sps/BlockStorageMovementAttemptedItems.java | 17 +-
.../sps/BlockStorageMovementNeeded.java | 48 ++-
.../hdfs/server/namenode/sps/Context.java | 181 +++++++++
.../namenode/sps/IntraSPSNameNodeContext.java | 220 +++++++++++
.../namenode/sps/StoragePolicySatisfier.java | 374 +++++++++----------
.../TestBlockStorageMovementAttemptedItems.java | 17 +-
.../sps/TestStoragePolicySatisfier.java | 25 +-
14 files changed, 742 insertions(+), 290 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5ee869e..e97fb46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -89,11 +89,12 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
-import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -440,6 +441,7 @@ public class BlockManager implements BlockStatsMXBean {
private final StoragePolicySatisfier sps;
private final boolean storagePolicyEnabled;
private boolean spsEnabled;
+ private Context spsctxt = null;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
@@ -485,8 +487,8 @@ public class BlockManager implements BlockStatsMXBean {
conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
- StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem);
- sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt);
+ spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf);
+ sps = new StoragePolicySatisfier(spsctxt);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@@ -5032,8 +5034,8 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is already running.");
return;
}
-
- sps.start(false);
+ // TODO: FSDirectory will get removed via HDFS-12911 modularization work
+ sps.start(false, namesystem.getFSDirectory());
}
/**
@@ -5069,8 +5071,8 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is already running.");
return;
}
-
- sps.start(true);
+ // TODO: FSDirectory will get removed via HDFS-12911 modularization work
+ sps.start(true, namesystem.getFSDirectory());
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index f9a76b4..b09d908 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -802,7 +802,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
/** Increment the number of blocks scheduled. */
- void incrementBlocksScheduled(StorageType t) {
+ public void incrementBlocksScheduled(StorageType t) {
currApproxBlocksScheduled.add(t, 1);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 35d0f41..fd1c95b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DFSNetworkTopology;
import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@@ -2045,5 +2046,26 @@ public class DatanodeManager {
}
}
}
+
+ /**
+ * Generates datanode reports for the given report type.
+ *
+ * @param type
+ * type of the datanode report
+ * @return array of DatanodeStorageReports
+ */
+ public DatanodeStorageReport[] getDatanodeStorageReport(
+ DatanodeReportType type) {
+ final List<DatanodeDescriptor> datanodes = getDatanodeListForReport(type);
+
+ DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes
+ .size()];
+ for (int i = 0; i < reports.length; i++) {
+ final DatanodeDescriptor d = datanodes.get(i);
+ reports[i] = new DatanodeStorageReport(
+ new DatanodeInfoBuilder().setFrom(d).build(), d.getStorageReports());
+ }
+ return reports;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 8b77034..517aae1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -89,6 +89,7 @@ class FSDirStatAndListingOp {
* @param srcArg The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
+ * @param needLocation if blockLocations need to be returned
*
* @param needLocation Include {@link LocatedBlocks} in result.
* @param needBlockToken Include block tokens in {@link LocatedBlocks}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 8a36257..b40a8d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3093,6 +3093,29 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param src The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
+ * @param needLocation if blockLocations need to be returned
+ *
+ * @throws AccessControlException
+ * if access is denied
+ * @throws UnresolvedLinkException
+ * if a symlink is encountered.
+ *
+ * @return object containing information regarding the file or null if file
+ * not found
+ * @throws StandbyException
+ */
+ @Override
+ public HdfsFileStatus getFileInfo(final String src, boolean resolveLink,
+ boolean needLocation) throws IOException {
+ return getFileInfo(src, resolveLink, needLocation, false);
+ }
+
+ /**
+ * Get the file info for a specific file.
+ *
+ * @param src The string representation of the path to the file
+ * @param resolveLink whether to throw UnresolvedLinkException
+ * if src refers to a symlink
*
* @param needLocation Include {@link LocatedBlocks} in result.
* @param needBlockToken Include block tokens in {@link LocatedBlocks}
@@ -3126,6 +3149,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return stat;
}
+ @Override
+ public String getFilePath(Long inodeId) {
+ readLock();
+ try {
+ INode inode = getFSDirectory().getInode(inodeId);
+ return inode == null ? null : inode.getFullPathName();
+ } finally {
+ readUnlock();
+ }
+ }
+
/**
* Returns true if the file is closed
*/
@@ -4411,15 +4445,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
checkOperation(OperationCategory.UNCHECKED);
final DatanodeManager dm = getBlockManager().getDatanodeManager();
- final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
-
- reports = new DatanodeStorageReport[datanodes.size()];
- for (int i = 0; i < reports.length; i++) {
- final DatanodeDescriptor d = datanodes.get(i);
- reports[i] = new DatanodeStorageReport(
- new DatanodeInfoBuilder().setFrom(d).build(),
- d.getStorageReports());
- }
+ reports = dm.getDatanodeStorageReport(type);
} finally {
readUnlock("getDatanodeStorageReport");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
deleted file mode 100644
index 111cabb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
-
-/**
- * This class is the Namenode implementation for analyzing the file blocks which
- * are expecting to change its storages and assigning the block storage
- * movements to satisfy the storage policy.
- */
-// TODO: Now, added one API which is required for sps package. Will refine
-// this interface via HDFS-12911.
-public class IntraNNSPSContext implements StoragePolicySatisfier.Context {
- private final Namesystem namesystem;
-
- public IntraNNSPSContext(Namesystem namesystem) {
- this.namesystem = namesystem;
- }
-
- @Override
- public int getNumLiveDataNodes() {
- return namesystem.getFSDirectory().getBlockManager().getDatanodeManager()
- .getNumLiveDataNodes();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index e58fa72..fc933b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.util.RwLock;
@@ -62,4 +63,27 @@ public interface Namesystem extends RwLock, SafeMode {
* @throws IOException
*/
void removeXattr(long id, String xattrName) throws IOException;
+
+ /**
+ * Gets the fileInfo of the given file path.
+ *
+ * @param filePath string representation of the path to the file
+ * @param resolveLink whether to throw UnresolvedLinkException
+ * if src refers to a symlink
+ * @param needLocation if blockLocations need to be returned
+ *
+ * @return hdfs file status details
+ * @throws IOException
+ */
+ HdfsFileStatus getFileInfo(String filePath, boolean resolveLink,
+ boolean needLocation) throws IOException;
+
+ /**
+ * Gets the file path corresponds to the given file id.
+ *
+ * @param inodeId
+ * file id
+ * @return string file path
+ */
+ String getFilePath(Long inodeId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index b044f30..1cae027 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -25,6 +25,11 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY;
+
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
@@ -66,15 +71,21 @@ public class BlockStorageMovementAttemptedItems {
//
private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded;
+ private final Context ctxt;
- public BlockStorageMovementAttemptedItems(long recheckTimeout,
- long selfRetryTimeout,
+ public BlockStorageMovementAttemptedItems(Context context,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+ this.ctxt = context;
+ long recheckTimeout = ctxt.getConf().getLong(
+ DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+ DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT);
if (recheckTimeout > 0) {
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
}
- this.selfRetryTimeout = selfRetryTimeout;
+ this.selfRetryTimeout = ctxt.getConf().getLong(
+ DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
+ DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new ArrayList<>();
movementFinishedBlocks = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 5635621..80f1893 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
-import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
import java.io.IOException;
import java.util.ArrayList;
@@ -35,10 +36,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathSta
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@@ -73,13 +73,11 @@ public class BlockStorageMovementNeeded {
private final Map<Long, StoragePolicySatisfyPathStatusInfo> spsStatus =
new ConcurrentHashMap<>();
- private final Namesystem namesystem;
+ private final Context ctxt;
// List of pending dir to satisfy the policy
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
- private final StoragePolicySatisfier sps;
-
private Daemon inodeIdCollector;
private final int maxQueuedItem;
@@ -88,11 +86,11 @@ public class BlockStorageMovementNeeded {
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
- public BlockStorageMovementNeeded(Namesystem namesystem,
- StoragePolicySatisfier sps, int queueLimit) {
- this.namesystem = namesystem;
- this.sps = sps;
- this.maxQueuedItem = queueLimit;
+ public BlockStorageMovementNeeded(Context context) {
+ this.ctxt = context;
+ this.maxQueuedItem = ctxt.getConf().getInt(
+ DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
}
/**
@@ -188,8 +186,7 @@ public class BlockStorageMovementNeeded {
// If track is part of some start inode then reduce the pending
// directory work count.
long startId = trackInfo.getStartId();
- INode inode = namesystem.getFSDirectory().getInode(startId);
- if (inode == null) {
+ if (!ctxt.isFileExist(startId)) {
// directory deleted just remove it.
this.pendingWorkForDirectory.remove(startId);
updateStatus(startId, isSuccess);
@@ -198,7 +195,7 @@ public class BlockStorageMovementNeeded {
if (pendingWork != null) {
pendingWork.decrementPendingWorkCount();
if (pendingWork.isDirWorkDone()) {
- namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+ ctxt.removeSPSHint(startId);
pendingWorkForDirectory.remove(startId);
pendingWork.setFailure(!isSuccess);
updateStatus(startId, pendingWork.isPolicySatisfied());
@@ -209,8 +206,7 @@ public class BlockStorageMovementNeeded {
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
- namesystem.removeXattr(trackInfo.getTrackId(),
- XATTR_SATISFY_STORAGE_POLICY);
+ ctxt.removeSPSHint(trackInfo.getTrackId());
updateStatus(trackInfo.getStartId(), isSuccess);
}
}
@@ -256,7 +252,7 @@ public class BlockStorageMovementNeeded {
while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
try {
// Remove xAttr for file
- namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY);
+ ctxt.removeSPSHint(trackId);
} catch (IOException ie) {
LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie);
}
@@ -269,8 +265,7 @@ public class BlockStorageMovementNeeded {
try {
// Remove xAttr for file
if (!itemInfo.isDir()) {
- namesystem.removeXattr(itemInfo.getTrackId(),
- XATTR_SATISFY_STORAGE_POLICY);
+ ctxt.removeSPSHint(itemInfo.getTrackId());
}
} catch (IOException ie) {
LOG.warn(
@@ -300,10 +295,9 @@ public class BlockStorageMovementNeeded {
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
long lastStatusCleanTime = 0;
- while (namesystem.isRunning() && sps.isRunning()) {
+ while (ctxt.isRunning()) {
try {
- if (!namesystem.isInSafeMode()) {
- FSDirectory fsd = namesystem.getFSDirectory();
+ if (!ctxt.isInSafeMode()) {
Long startINodeId = spsDirsToBeTraveresed.poll();
if (startINodeId == null) {
// Waiting for SPS path
@@ -311,7 +305,7 @@ public class BlockStorageMovementNeeded {
spsDirsToBeTraveresed.wait(5000);
}
} else {
- INode startInode = fsd.getInode(startINodeId);
+ INode startInode = getFSDirectory().getInode(startINodeId);
if (startInode != null) {
try {
remainingCapacity = remainingCapacity();
@@ -333,8 +327,7 @@ public class BlockStorageMovementNeeded {
DirPendingWorkInfo dirPendingWorkInfo =
pendingWorkForDirectory.get(startInode.getId());
if (dirPendingWorkInfo.isDirWorkDone()) {
- namesystem.removeXattr(startInode.getId(),
- XATTR_SATISFY_STORAGE_POLICY);
+ ctxt.removeSPSHint(startInode.getId());
pendingWorkForDirectory.remove(startInode.getId());
updateStatus(startInode.getId(), true);
}
@@ -483,9 +476,10 @@ public class BlockStorageMovementNeeded {
}
}
- public void init() {
+ // TODO: FSDirectory will get removed via HDFS-12911 modularization work
+ public void init(FSDirectory fsd) {
inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
- namesystem.getFSDirectory()));
+ fsd));
inodeIdCollector.setName("FileInodeIdCollector");
inodeIdCollector.start();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
new file mode 100644
index 0000000..d11e26f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessControlException;
+
+/**
+ * An interface for the communication between NameNode and SPS module.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface Context {
+
+ /**
+ * Returns configuration object.
+ */
+ Configuration getConf();
+
+ /**
+ * Returns true if the SPS is running, false otherwise.
+ */
+ boolean isRunning();
+
+ /**
+ * Update the SPS running status.
+ *
+ * @param isSpsRunning
+ * true represents running, false otherwise
+ */
+ void setSPSRunning(Supplier<Boolean> isSpsRunning);
+
+ /**
+ * Returns true if the Namenode in safe mode, false otherwise.
+ */
+ boolean isInSafeMode();
+
+ /**
+ * Returns true if Mover tool is already running, false otherwise.
+ */
+ boolean isMoverRunning();
+
+ /**
+ * Gets the Inode ID number for the given path.
+ *
+ * @param path
+ * - file/dir path
+ * @return Inode id number
+ */
+ long getFileID(String path) throws UnresolvedLinkException,
+ AccessControlException, ParentNotDirectoryException;
+
+ /**
+ * Gets the network topology.
+ *
+ * @return network topology
+ */
+ NetworkTopology getNetworkTopology();
+
+ /**
+ * Returns true if the give Inode exists in the Namespace.
+ *
+ * @param inodeId
+ * - Inode ID
+ * @return true if Inode exists, false otherwise.
+ */
+ boolean isFileExist(long inodeId);
+
+ /**
+ * Gets the storage policy details for the given policy ID.
+ *
+ * @param policyId
+ * - Storage policy ID
+ * @return the detailed policy object
+ */
+ BlockStoragePolicy getStoragePolicy(byte policyId);
+
+ /**
+ * Drop the SPS work in case if any previous work queued up.
+ */
+ void addDropPreviousSPSWorkAtDNs();
+
+ /**
+ * Remove the hint which was added to track SPS call.
+ *
+ * @param inodeId
+ * - Inode ID
+ * @throws IOException
+ */
+ void removeSPSHint(long inodeId) throws IOException;
+
+ /**
+ * Gets the number of live datanodes in the cluster.
+ *
+ * @return number of live datanodes
+ */
+ int getNumLiveDataNodes();
+
+ /**
+ * Get the file info for a specific file.
+ *
+ * @param inodeID
+ * inode identifier
+ * @return file status metadata information
+ */
+ HdfsFileStatus getFileInfo(long inodeID) throws IOException;
+
+ /**
+ * Returns all the live datanodes and its storage details.
+ *
+ * @throws IOException
+ */
+ DatanodeStorageReport[] getLiveDatanodeStorageReport()
+ throws IOException;
+
+ /**
+ * Returns true if the given inode file has low redundancy blocks.
+ *
+ * @param inodeID
+ * inode identifier
+ * @return true if block collection has low redundancy blocks
+ */
+ boolean hasLowRedundancyBlocks(long inodeID);
+
+ /**
+ * Assign the given block movement task to the target node present in
+ * {@link BlockMovingInfo}.
+ *
+ * @param blkMovingInfo
+ * block to storage info
+ * @throws IOException
+ */
+ void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
+ throws IOException;
+
+ /**
+ * Checks whether the given datanode has sufficient space to occupy the given
+ * blockSize data.
+ *
+ * @param dn
+ * datanode info
+ * @param type
+ * storage type
+ * @param blockSize
+ * blockSize to be scheduled
+ * @return true if the given datanode has sufficient space to occupy blockSize
+ * data, false otherwise.
+ */
+ boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
+ StorageType type, long blockSize);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
new file mode 100644
index 0000000..6654212
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessControlException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is the Namenode implementation for analyzing the file blocks which
+ * are expecting to change its storages and assigning the block storage
+ * movements to satisfy the storage policy.
+ */
+public class IntraSPSNameNodeContext implements Context {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(IntraSPSNameNodeContext.class);
+
+ private final Namesystem namesystem;
+ private final BlockManager blockManager;
+ private final Configuration conf;
+ private Supplier<Boolean> isSpsRunning;
+
+ public IntraSPSNameNodeContext(Namesystem namesystem,
+ BlockManager blockManager, Configuration conf) {
+ this.namesystem = namesystem;
+ this.blockManager = blockManager;
+ this.conf = conf;
+ isSpsRunning = () -> false;
+ }
+
+ @Override
+ public int getNumLiveDataNodes() {
+ return blockManager.getDatanodeManager().getNumLiveDataNodes();
+ }
+
+ @Override
+ public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
+ String filePath = namesystem.getFilePath(inodeID);
+ if (StringUtils.isBlank(filePath)) {
+ LOG.debug("File with inodeID:{} doesn't exists!", inodeID);
+ return null;
+ }
+ HdfsFileStatus fileInfo = null;
+ try {
+ fileInfo = namesystem.getFileInfo(filePath, true, true);
+ } catch (IOException e) {
+ LOG.debug("File path:{} doesn't exists!", filePath);
+ }
+ return fileInfo;
+ }
+
+ @Override
+ public DatanodeStorageReport[] getLiveDatanodeStorageReport()
+ throws IOException {
+ namesystem.readLock();
+ try {
+ return blockManager.getDatanodeManager()
+ .getDatanodeStorageReport(DatanodeReportType.LIVE);
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+
+ @Override
+ public boolean hasLowRedundancyBlocks(long inodeID) {
+ namesystem.readLock();
+ try {
+ BlockCollection bc = namesystem.getBlockCollection(inodeID);
+ return blockManager.hasLowRedundancyBlocks(bc);
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public boolean isFileExist(long inodeId) {
+ return namesystem.getFSDirectory().getInode(inodeId) != null;
+ }
+
+ @Override
+ public void removeSPSHint(long inodeId) throws IOException {
+ this.namesystem.removeXattr(inodeId, XATTR_SATISFY_STORAGE_POLICY);
+ }
+
+ @Override
+ public boolean isRunning() {
+ // TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside
+ // SPS. Context interface will be further refined as part of HDFS-12911
+ // modularization task. One idea is to introduce a cleaner interface similar
+ // to Namesystem for better abstraction.
+ return namesystem.isRunning() && isSpsRunning.get();
+ }
+
+ @Override
+ public void setSPSRunning(Supplier<Boolean> spsRunningFlag) {
+ this.isSpsRunning = spsRunningFlag;
+ }
+
+ @Override
+ public boolean isInSafeMode() {
+ return namesystem.isInSafeMode();
+ }
+
+ @Override
+ public boolean isMoverRunning() {
+ String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
+ return namesystem.isFileOpenedForWrite(moverId);
+ }
+
+ @Override
+ public void addDropPreviousSPSWorkAtDNs() {
+ namesystem.readLock();
+ try {
+ blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+
+ @Override
+ public BlockStoragePolicy getStoragePolicy(byte policyID) {
+ return blockManager.getStoragePolicy(policyID);
+ }
+
+ @Override
+ public NetworkTopology getNetworkTopology() {
+ return blockManager.getDatanodeManager().getNetworkTopology();
+ }
+
+ @Override
+ public long getFileID(String path) throws UnresolvedLinkException,
+ AccessControlException, ParentNotDirectoryException {
+ namesystem.readLock();
+ try {
+ INode inode = namesystem.getFSDirectory().getINode(path);
+ return inode == null ? -1 : inode.getId();
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+
+ @Override
+ public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
+ throws IOException {
+ namesystem.readLock();
+ try {
+ DatanodeDescriptor dn = blockManager.getDatanodeManager()
+ .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
+ if (dn == null) {
+ throw new IOException("Failed to schedule block movement task:"
+ + blkMovingInfo + " as target datanode: "
+ + blkMovingInfo.getTarget() + " doesn't exists");
+ }
+ dn.addBlocksToMoveStorage(blkMovingInfo);
+ dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+
+ @Override
+ public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
+ StorageType type, long blockSize) {
+ namesystem.readLock();
+ try {
+ DatanodeDescriptor datanode = blockManager.getDatanodeManager()
+ .getDatanode(dn.getDatanodeUuid());
+ if (datanode == null) {
+ LOG.debug("Target datanode: " + dn + " doesn't exists");
+ return false;
+ }
+ return null != datanode.chooseStorage4Block(type, blockSize);
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 0d4bb19..b3e6b78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -29,29 +29,28 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
@@ -79,8 +78,6 @@ public class StoragePolicySatisfier implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
- private final Namesystem namesystem;
- private final BlockManager blockManager;
private final BlockStorageMovementNeeded storageMovementNeeded;
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
@@ -90,16 +87,6 @@ public class StoragePolicySatisfier implements Runnable {
private final Context ctxt;
/**
- * An interface for analyzing and assigning the block storage movements to
- * worker nodes.
- */
- // TODO: Now, added one API which is required for sps package. Will refine
- // this interface via HDFS-12911.
- public interface Context {
- int getNumLiveDataNodes();
- }
-
- /**
* Represents the collective analysis status for all blocks.
*/
private static class BlocksMovingAnalysis {
@@ -124,7 +111,9 @@ public class StoragePolicySatisfier implements Runnable {
BLOCKS_TARGET_PAIRING_SKIPPED,
// Represents that, All the reported blocks are satisfied the policy but
// some of the blocks are low redundant.
- FEW_LOW_REDUNDANCY_BLOCKS
+ FEW_LOW_REDUNDANCY_BLOCKS,
+ // Represents that, movement failures due to unexpected errors.
+ BLOCKS_FAILED_TO_MOVE
}
private Status status = null;
@@ -136,36 +125,27 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- public StoragePolicySatisfier(final Namesystem namesystem,
- final BlockManager blkManager, Configuration conf, Context ctxt) {
- this.namesystem = namesystem;
- this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
- this, conf.getInt(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
- this.blockManager = blkManager;
- this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
- conf.getLong(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT),
- conf.getLong(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
+ public StoragePolicySatisfier(Context ctxt) {
+ this.ctxt = ctxt;
+ this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt);
+ this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt,
storageMovementNeeded);
- this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
- this.blockMovementMaxRetry = conf.getInt(
+ this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf());
+ this.blockMovementMaxRetry = ctxt.getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
- this.ctxt = ctxt;
}
/**
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
+ *
+ * // TODO: FSDirectory will get removed via HDFS-12911 modularization work.
*/
- public synchronized void start(boolean reconfigStart) {
+ public synchronized void start(boolean reconfigStart, FSDirectory fsd) {
isRunning = true;
- if (checkIfMoverRunning()) {
+ ctxt.setSPSRunning(this::isRunning);
+ if (ctxt.isMoverRunning()) {
isRunning = false;
LOG.error(
"Stopping StoragePolicySatisfier thread " + "as Mover ID file "
@@ -183,7 +163,7 @@ public class StoragePolicySatisfier implements Runnable {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
- storageMovementNeeded.init();
+ storageMovementNeeded.init(fsd);
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@@ -199,7 +179,6 @@ public class StoragePolicySatisfier implements Runnable {
*/
public synchronized void disable(boolean forceStop) {
isRunning = false;
-
if (storagePolicySatisfierThread == null) {
return;
}
@@ -242,25 +221,19 @@ public class StoragePolicySatisfier implements Runnable {
return isRunning;
}
- // Return true if a Mover instance is running
- private boolean checkIfMoverRunning() {
- String moverId = HdfsServerConstants.MOVER_ID_PATH.toString();
- return namesystem.isFileOpenedForWrite(moverId);
- }
-
/**
* Adding drop commands to all datanodes to stop performing the satisfier
* block movements, if any.
*/
private void addDropSPSWorkCommandsToAllDNs() {
- this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+ ctxt.addDropPreviousSPSWorkAtDNs();
}
@Override
public void run() {
- while (namesystem.isRunning() && isRunning) {
+ while (ctxt.isRunning()) {
try {
- if (!namesystem.isInSafeMode()) {
+ if (!ctxt.isInSafeMode()) {
ItemInfo itemInfo = storageMovementNeeded.get();
if (itemInfo != null) {
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
@@ -271,25 +244,28 @@ public class StoragePolicySatisfier implements Runnable {
continue;
}
long trackId = itemInfo.getTrackId();
- BlockCollection blockCollection;
BlocksMovingAnalysis status = null;
- try {
- namesystem.readLock();
- blockCollection = namesystem.getBlockCollection(trackId);
- // Check blockCollectionId existence.
- if (blockCollection == null) {
- // File doesn't exists (maybe got deleted), remove trackId from
- // the queue
- storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
- } else {
- status =
- analyseBlocksStorageMovementsAndAssignToDN(
- blockCollection);
- }
- } finally {
- namesystem.readUnlock();
- }
- if (blockCollection != null) {
+ DatanodeStorageReport[] liveDnReports;
+ BlockStoragePolicy existingStoragePolicy;
+ // TODO: presently, context internally acquire the lock
+ // and returns the result. Need to discuss to move the lock outside?
+ boolean hasLowRedundancyBlocks = ctxt
+ .hasLowRedundancyBlocks(trackId);
+ HdfsFileStatus fileStatus = ctxt.getFileInfo(trackId);
+ // Check path existence.
+ if (fileStatus == null || fileStatus.isDir()) {
+ // File doesn't exists (maybe got deleted) or its a directory,
+ // just remove trackId from the queue
+ storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
+ } else {
+ liveDnReports = ctxt.getLiveDatanodeStorageReport();
+ byte existingStoragePolicyID = fileStatus.getStoragePolicy();
+ existingStoragePolicy = ctxt
+ .getStoragePolicy(existingStoragePolicyID);
+
+ HdfsLocatedFileStatus file = (HdfsLocatedFileStatus) fileStatus;
+ status = analyseBlocksStorageMovementsAndAssignToDN(file,
+ hasLowRedundancyBlocks, existingStoragePolicy, liveDnReports);
switch (status.status) {
// Just add to monitor, so it will be retried after timeout
case ANALYSIS_SKIPPED_FOR_RETRY:
@@ -317,6 +293,14 @@ public class StoragePolicySatisfier implements Runnable {
}
this.storageMovementNeeded.add(itemInfo);
break;
+ case BLOCKS_FAILED_TO_MOVE:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding trackID " + trackId
+ + " back to retry queue as some of the blocks"
+ + " movement failed.");
+ }
+ this.storageMovementNeeded.add(itemInfo);
+ break;
// Just clean Xattrs
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
@@ -350,14 +334,11 @@ public class StoragePolicySatisfier implements Runnable {
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
- if (!namesystem.isRunning()) {
- LOG.info("Stopping StoragePolicySatisfier.");
- if (!(t instanceof InterruptedException)) {
- LOG.info("StoragePolicySatisfier received an exception"
- + " while shutting down.", t);
- }
- return;
+ if (!(t instanceof InterruptedException)) {
+ LOG.info("StoragePolicySatisfier received an exception"
+ + " while shutting down.", t);
}
+ LOG.info("Stopping StoragePolicySatisfier.");
}
}
}
@@ -367,41 +348,43 @@ public class StoragePolicySatisfier implements Runnable {
}
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
- BlockCollection blockCollection) {
+ HdfsLocatedFileStatus fileInfo, boolean hasLowRedundancyBlocks,
+ BlockStoragePolicy existingStoragePolicy,
+ DatanodeStorageReport[] liveDns) {
BlocksMovingAnalysis.Status status =
BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
- byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
- BlockStoragePolicy existingStoragePolicy =
- blockManager.getStoragePolicy(existingStoragePolicyID);
- if (!blockCollection.getLastBlock().isComplete()) {
+ final ErasureCodingPolicy ecPolicy = fileInfo.getErasureCodingPolicy();
+ final LocatedBlocks locatedBlocks = fileInfo.getLocatedBlocks();
+ final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
+ if (!lastBlkComplete) {
// Postpone, currently file is under construction
// So, should we add back? or leave it to user
LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
- + " this to the next retry iteration", blockCollection.getId());
+ + " this to the next retry iteration", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
new ArrayList<>());
}
- BlockInfo[] blocks = blockCollection.getBlocks();
- if (blocks.length == 0) {
+ List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
+ if (blocks.size() == 0) {
LOG.info("BlockCollectionID: {} file is not having any blocks."
- + " So, skipping the analysis.", blockCollection.getId());
+ + " So, skipping the analysis.", fileInfo.getFileId());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new ArrayList<>());
}
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
- for (int i = 0; i < blocks.length; i++) {
- BlockInfo blockInfo = blocks[i];
+ for (int i = 0; i < blocks.size(); i++) {
+ LocatedBlock blockInfo = blocks.get(i);
List<StorageType> expectedStorageTypes;
if (blockInfo.isStriped()) {
if (ErasureCodingPolicyManager
.checkStoragePolicySuitableForECStripedMode(
- existingStoragePolicyID)) {
+ existingStoragePolicy.getId())) {
expectedStorageTypes = existingStoragePolicy
- .chooseStorageTypes((short) blockInfo.getCapacity());
+ .chooseStorageTypes((short) blockInfo.getLocations().length);
} else {
// Currently we support only limited policies (HOT, COLD, ALLSSD)
// for EC striped mode files. SPS will ignore to move the blocks if
@@ -415,22 +398,16 @@ public class StoragePolicySatisfier implements Runnable {
}
} else {
expectedStorageTypes = existingStoragePolicy
- .chooseStorageTypes(blockInfo.getReplication());
+ .chooseStorageTypes(fileInfo.getReplication());
}
- DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
- StorageType[] storageTypes = new StorageType[storages.length];
- for (int j = 0; j < storages.length; j++) {
- DatanodeStorageInfo datanodeStorageInfo = storages[j];
- StorageType storageType = datanodeStorageInfo.getStorageType();
- storageTypes[j] = storageType;
- }
- List<StorageType> existing =
- new LinkedList<StorageType>(Arrays.asList(storageTypes));
+ List<StorageType> existing = new LinkedList<StorageType>(
+ Arrays.asList(blockInfo.getStorageTypes()));
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
- blockInfo, expectedStorageTypes, existing, storages);
+ blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(),
+ liveDns, ecPolicy);
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
} else {
@@ -439,7 +416,7 @@ public class StoragePolicySatisfier implements Runnable {
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
} else {
- if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
+ if (hasLowRedundancyBlocks) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
}
@@ -448,13 +425,15 @@ public class StoragePolicySatisfier implements Runnable {
List<Block> assignedBlockIds = new ArrayList<Block>();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
- if (blkMovingInfo.getTarget() != null) {
- // assign block storage movement task to the target node
- ((DatanodeDescriptor) blkMovingInfo.getTarget())
- .addBlocksToMoveStorage(blkMovingInfo);
+ try {
+ ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
assignedBlockIds.add(blkMovingInfo.getBlock());
blockCount++;
+ } catch (IOException e) {
+ LOG.warn("Exception while scheduling movement task", e);
+ // failed to move the block.
+ status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE;
}
}
return new BlocksMovingAnalysis(status, assignedBlockIds);
@@ -481,29 +460,29 @@ public class StoragePolicySatisfier implements Runnable {
* satisfy the storage policy, true otherwise
*/
private boolean computeBlockMovingInfos(
- List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+ List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageType> expectedStorageTypes, List<StorageType> existing,
- DatanodeStorageInfo[] storages) {
+ DatanodeInfo[] storages, DatanodeStorageReport[] liveDns,
+ ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
- List<DatanodeStorageInfo> existingBlockStorages =
- new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+ List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>(
+ Arrays.asList(storages));
// if expected type exists in source node already, local movement would be
// possible, so lets find such sources first.
- Iterator<DatanodeStorageInfo> iterator = existingBlockStorages.iterator();
+ Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator();
while (iterator.hasNext()) {
- DatanodeStorageInfo datanodeStorageInfo = iterator.next();
- if (checkSourceAndTargetTypeExists(
- datanodeStorageInfo.getDatanodeDescriptor(), existing,
- expectedStorageTypes)) {
+ DatanodeInfoWithStorage dnInfo = (DatanodeInfoWithStorage) iterator
+ .next();
+ if (checkSourceAndTargetTypeExists(dnInfo, existing,
+ expectedStorageTypes, liveDns)) {
sourceWithStorageMap
- .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(),
- datanodeStorageInfo.getDatanodeDescriptor()));
+ .add(new StorageTypeNodePair(dnInfo.getStorageType(), dnInfo));
iterator.remove();
- existing.remove(datanodeStorageInfo.getStorageType());
+ existing.remove(dnInfo.getStorageType());
}
}
@@ -511,23 +490,25 @@ public class StoragePolicySatisfier implements Runnable {
for (StorageType existingType : existing) {
iterator = existingBlockStorages.iterator();
while (iterator.hasNext()) {
- DatanodeStorageInfo datanodeStorageInfo = iterator.next();
- StorageType storageType = datanodeStorageInfo.getStorageType();
+ DatanodeInfoWithStorage dnStorageInfo =
+ (DatanodeInfoWithStorage) iterator.next();
+ StorageType storageType = dnStorageInfo.getStorageType();
if (storageType == existingType) {
iterator.remove();
sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
- datanodeStorageInfo.getDatanodeDescriptor()));
+ dnStorageInfo));
break;
}
}
}
StorageTypeNodeMap locsForExpectedStorageTypes =
- findTargetsForExpectedStorageTypes(expectedStorageTypes);
+ findTargetsForExpectedStorageTypes(expectedStorageTypes, liveDns);
foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
blockMovingInfos, blockInfo, sourceWithStorageMap,
- expectedStorageTypes, locsForExpectedStorageTypes);
+ expectedStorageTypes, locsForExpectedStorageTypes,
+ ecPolicy);
}
return foundMatchingTargetNodesForBlock;
}
@@ -550,12 +531,13 @@ public class StoragePolicySatisfier implements Runnable {
* satisfy the storage policy
*/
private boolean findSourceAndTargetToMove(
- List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+ List<BlockMovingInfo> blockMovingInfos, LocatedBlock blockInfo,
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expected,
- StorageTypeNodeMap locsForExpectedStorageTypes) {
+ StorageTypeNodeMap locsForExpectedStorageTypes,
+ ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
- List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
+ List<DatanodeInfo> excludeNodes = new ArrayList<>();
// Looping over all the source node locations and choose the target
// storage within same node if possible. This is done separately to
@@ -566,13 +548,14 @@ public class StoragePolicySatisfier implements Runnable {
// Check whether the block replica is already placed in the expected
// storage type in this source datanode.
if (!expected.contains(existingTypeNodePair.storageType)) {
- StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
- blockInfo, existingTypeNodePair.dn, expected);
+ StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(blockInfo,
+ existingTypeNodePair.dn, expected);
if (chosenTarget != null) {
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
- chosenTarget.storageType, blockMovingInfos);
+ chosenTarget.storageType, blockMovingInfos,
+ ecPolicy);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
@@ -596,7 +579,7 @@ public class StoragePolicySatisfier implements Runnable {
if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
continue;
}
- if (chosenTarget == null && blockManager.getDatanodeManager()
+ if (chosenTarget == null && ctxt
.getNetworkTopology().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
@@ -619,7 +602,7 @@ public class StoragePolicySatisfier implements Runnable {
if (blockInfo.isStriped()) {
buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
- chosenTarget.storageType, blockMovingInfos);
+ chosenTarget.storageType, blockMovingInfos, ecPolicy);
} else {
buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
existingTypeNodePair.storageType, chosenTarget.dn,
@@ -645,7 +628,7 @@ public class StoragePolicySatisfier implements Runnable {
}
private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
- DatanodeDescriptor dn) {
+ DatanodeInfo dn) {
for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
if (blockMovingInfo.getSource().equals(dn)) {
return true;
@@ -654,37 +637,40 @@ public class StoragePolicySatisfier implements Runnable {
return false;
}
- private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
+ private void buildContinuousBlockMovingInfos(LocatedBlock blockInfo,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
List<BlockMovingInfo> blkMovingInfos) {
- Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
- blockInfo.getGenerationStamp());
+ Block blk = ExtendedBlock.getLocalBlock(blockInfo.getBlock());
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
}
- private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
+ private void buildStripedBlockMovingInfos(LocatedBlock blockInfo,
DatanodeInfo sourceNode, StorageType sourceStorageType,
DatanodeInfo targetNode, StorageType targetStorageType,
- List<BlockMovingInfo> blkMovingInfos) {
+ List<BlockMovingInfo> blkMovingInfos, ErasureCodingPolicy ecPolicy) {
// For a striped block, it needs to construct internal block at the given
// index of a block group. Here it is iterating over all the block indices
// and construct internal blocks which can be then considered for block
// movement.
- BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo;
- for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
- if (si.getBlockIndex() >= 0) {
- DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
- if (sourceNode.equals(dn)) {
+ LocatedStripedBlock sBlockInfo = (LocatedStripedBlock) blockInfo;
+ byte[] indices = sBlockInfo.getBlockIndices();
+ DatanodeInfo[] locations = sBlockInfo.getLocations();
+ for (int i = 0; i < indices.length; i++) {
+ byte blkIndex = indices[i];
+ if (blkIndex >= 0) {
+ // pick block movement only for the given source node.
+ if (sourceNode.equals(locations[i])) {
// construct internal block
- long blockId = blockInfo.getBlockId() + si.getBlockIndex();
+ ExtendedBlock extBlock = sBlockInfo.getBlock();
long numBytes = StripedBlockUtil.getInternalBlockLength(
- sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
- sBlockInfo.getDataBlockNum(), si.getBlockIndex());
- Block blk = new Block(blockId, numBytes,
- blockInfo.getGenerationStamp());
+ extBlock.getNumBytes(), ecPolicy, blkIndex);
+ Block blk = new Block(ExtendedBlock.getLocalBlock(extBlock));
+ long blkId = blk.getBlockId() + blkIndex;
+ blk.setBlockId(blkId);
+ blk.setNumBytes(numBytes);
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
targetNode, sourceStorageType, targetStorageType);
blkMovingInfos.add(blkMovingInfo);
@@ -703,34 +689,35 @@ public class StoragePolicySatisfier implements Runnable {
* @param targetTypes
* - list of target storage types
*/
- private StorageTypeNodePair chooseTargetTypeInSameNode(Block block,
- DatanodeDescriptor source, List<StorageType> targetTypes) {
+ private StorageTypeNodePair chooseTargetTypeInSameNode(LocatedBlock blockInfo,
+ DatanodeInfo source, List<StorageType> targetTypes) {
for (StorageType t : targetTypes) {
- DatanodeStorageInfo chooseStorage4Block =
- source.chooseStorage4Block(t, block.getNumBytes());
- if (chooseStorage4Block != null) {
+ boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
+ source, t, blockInfo.getBlockSize());
+ if (goodTargetDn) {
return new StorageTypeNodePair(t, source);
}
}
return null;
}
- private StorageTypeNodePair chooseTarget(Block block,
- DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
+ private StorageTypeNodePair chooseTarget(LocatedBlock block,
+ DatanodeInfo source, List<StorageType> targetTypes, Matcher matcher,
StorageTypeNodeMap locsForExpectedStorageTypes,
- List<DatanodeDescriptor> excludeNodes) {
+ List<DatanodeInfo> excludeNodes) {
for (StorageType t : targetTypes) {
- List<DatanodeDescriptor> nodesWithStorages =
- locsForExpectedStorageTypes.getNodesWithStorages(t);
+ List<DatanodeInfo> nodesWithStorages = locsForExpectedStorageTypes
+ .getNodesWithStorages(t);
if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
continue; // no target nodes with the required storage type.
}
Collections.shuffle(nodesWithStorages);
- for (DatanodeDescriptor target : nodesWithStorages) {
- if (!excludeNodes.contains(target) && matcher.match(
- blockManager.getDatanodeManager().getNetworkTopology(), source,
- target)) {
- if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
+ for (DatanodeInfo target : nodesWithStorages) {
+ if (!excludeNodes.contains(target)
+ && matcher.match(ctxt.getNetworkTopology(), source, target)) {
+ boolean goodTargetDn = ctxt.verifyTargetDatanodeHasSpaceForScheduling(
+ target, t, block.getBlockSize());
+ if (goodTargetDn) {
return new StorageTypeNodePair(t, target);
}
}
@@ -741,27 +728,25 @@ public class StoragePolicySatisfier implements Runnable {
private static class StorageTypeNodePair {
private StorageType storageType = null;
- private DatanodeDescriptor dn = null;
+ private DatanodeInfo dn = null;
- StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
+ StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) {
this.storageType = storageType;
this.dn = dn;
}
}
private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
- List<StorageType> expected) {
+ List<StorageType> expected, DatanodeStorageReport[] liveDns) {
StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
- List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
- .getDatanodeListForReport(DatanodeReportType.LIVE);
- for (DatanodeDescriptor dn : reports) {
+ for (DatanodeStorageReport dn : liveDns) {
StorageReport[] storageReports = dn.getStorageReports();
for (StorageReport storageReport : storageReports) {
StorageType t = storageReport.getStorage().getStorageType();
if (expected.contains(t)) {
final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
if (maxRemaining > 0L) {
- targetMap.add(t, dn);
+ targetMap.add(t, dn.getDatanodeInfo());
}
}
}
@@ -782,32 +767,40 @@ public class StoragePolicySatisfier implements Runnable {
return max;
}
- private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn,
- List<StorageType> existing, List<StorageType> expectedStorageTypes) {
- DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos();
+ private boolean checkSourceAndTargetTypeExists(DatanodeInfo dn,
+ List<StorageType> existing, List<StorageType> expectedStorageTypes,
+ DatanodeStorageReport[] liveDns) {
boolean isExpectedTypeAvailable = false;
boolean isExistingTypeAvailable = false;
- for (DatanodeStorageInfo dnInfo : allDNStorageInfos) {
- StorageType storageType = dnInfo.getStorageType();
- if (existing.contains(storageType)) {
- isExistingTypeAvailable = true;
- }
- if (expectedStorageTypes.contains(storageType)) {
- isExpectedTypeAvailable = true;
+ for (DatanodeStorageReport liveDn : liveDns) {
+ if (dn.equals(liveDn.getDatanodeInfo())) {
+ StorageReport[] storageReports = liveDn.getStorageReports();
+ for (StorageReport eachStorage : storageReports) {
+ StorageType storageType = eachStorage.getStorage().getStorageType();
+ if (existing.contains(storageType)) {
+ isExistingTypeAvailable = true;
+ }
+ if (expectedStorageTypes.contains(storageType)) {
+ isExpectedTypeAvailable = true;
+ }
+ if (isExistingTypeAvailable && isExpectedTypeAvailable) {
+ return true;
+ }
+ }
}
}
return isExistingTypeAvailable && isExpectedTypeAvailable;
}
private static class StorageTypeNodeMap {
- private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
- new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
+ private final EnumMap<StorageType, List<DatanodeInfo>> typeNodeMap =
+ new EnumMap<StorageType, List<DatanodeInfo>>(StorageType.class);
- private void add(StorageType t, DatanodeDescriptor dn) {
- List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
- LinkedList<DatanodeDescriptor> value = null;
+ private void add(StorageType t, DatanodeInfo dn) {
+ List<DatanodeInfo> nodesWithStorages = getNodesWithStorages(t);
+ LinkedList<DatanodeInfo> value = null;
if (nodesWithStorages == null) {
- value = new LinkedList<DatanodeDescriptor>();
+ value = new LinkedList<DatanodeInfo>();
value.add(dn);
typeNodeMap.put(t, value);
} else {
@@ -820,7 +813,7 @@ public class StoragePolicySatisfier implements Runnable {
* - Storage type
* @return datanodes which has the given storage type
*/
- private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
+ private List<DatanodeInfo> getNodesWithStorages(StorageType type) {
return typeNodeMap.get(type);
}
}
@@ -982,7 +975,6 @@ public class StoragePolicySatisfier implements Runnable {
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
- INode inode = namesystem.getFSDirectory().getINode(path);
- return storageMovementNeeded.getStatus(inode.getId());
+ return storageMovementNeeded.getStatus(ctxt.getFileID(path));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index 62766d9..f9762a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -25,8 +25,9 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.junit.After;
@@ -46,11 +47,15 @@ public class TestBlockStorageMovementAttemptedItems {
@Before
public void setup() throws Exception {
- unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
- Mockito.mock(Namesystem.class),
- Mockito.mock(StoragePolicySatisfier.class), 100);
- bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
- selfRetryTimeout, unsatisfiedStorageMovementFiles);
+ Configuration config = new HdfsConfiguration();
+ Context ctxt = Mockito.mock(Context.class);
+ Mockito.when(ctxt.getConf()).thenReturn(config);
+ Mockito.when(ctxt.isRunning()).thenReturn(true);
+ Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
+ Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
+ unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt);
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt,
+ unsatisfiedStorageMovementFiles);
}
@After
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcce97f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 8dc52dc..2a7bde5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -105,7 +107,8 @@ public class TestStoragePolicySatisfier {
private DistributedFileSystem dfs = null;
private static final int DEFAULT_BLOCK_SIZE = 1024;
- private void shutdownCluster() {
+ @After
+ public void shutdownCluster() {
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
@@ -1298,11 +1301,17 @@ public class TestStoragePolicySatisfier {
//entry in queue. After 10 files, traverse control will be on U.
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
Mockito.when(sps.isRunning()).thenReturn(true);
+ Context ctxt = Mockito.mock(Context.class);
+ config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
+ Mockito.when(ctxt.getConf()).thenReturn(config);
+ Mockito.when(ctxt.isRunning()).thenReturn(true);
+ Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
+ Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
BlockStorageMovementNeeded movmentNeededQueue =
- new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
+ new BlockStorageMovementNeeded(ctxt);
INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
- movmentNeededQueue.init();
+ movmentNeededQueue.init(fsDir);
//Wait for thread to reach U.
Thread.sleep(1000);
@@ -1361,9 +1370,15 @@ public class TestStoragePolicySatisfier {
Mockito.when(sps.isRunning()).thenReturn(true);
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
+ Context ctxt = Mockito.mock(Context.class);
+ config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
+ Mockito.when(ctxt.getConf()).thenReturn(config);
+ Mockito.when(ctxt.isRunning()).thenReturn(true);
+ Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
+ Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
BlockStorageMovementNeeded movmentNeededQueue =
- new BlockStorageMovementNeeded(hdfsCluster.getNamesystem(), sps, 10);
- movmentNeededQueue.init();
+ new BlockStorageMovementNeeded(ctxt);
+ movmentNeededQueue.init(fsDir);
INode rootINode = fsDir.getINode("/root");
movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
// Wait for thread to reach U.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org