You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/01/29 04:12:33 UTC
[28/50] [abbrv] hadoop git commit: HDFS-12291: [SPS]: Provide a
mechanism to recursively iterate and satisfy storage policy of all the files
under the given dir. Contributed by Surendra Singh Lilhore.
HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e94f1deb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e94f1deb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e94f1deb
Branch: refs/heads/HDFS-10285
Commit: e94f1deb35c19c93fa8cf3dd3721860bca4b0310
Parents: 8d29166
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Sat Sep 30 06:31:52 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Mon Jan 29 09:21:08 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 22 +-
.../BlockStorageMovementAttemptedItems.java | 8 +-
.../namenode/BlockStorageMovementNeeded.java | 277 +++++++--
.../hdfs/server/namenode/FSTreeTraverser.java | 313 ++++++++++
.../server/namenode/ReencryptionHandler.java | 618 ++++++++-----------
.../server/namenode/ReencryptionUpdater.java | 2 +-
.../server/namenode/StoragePolicySatisfier.java | 43 +-
.../src/main/resources/hdfs-default.xml | 23 +
.../src/site/markdown/ArchivalStorage.md | 3 +-
.../TestBlockStorageMovementAttemptedItems.java | 2 +-
.../TestPersistentStoragePolicySatisfier.java | 8 +-
.../hdfs/server/namenode/TestReencryption.java | 3 -
.../namenode/TestReencryptionHandler.java | 10 +-
.../namenode/TestStoragePolicySatisfier.java | 377 ++++++++++-
15 files changed, 1260 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8c4fa69..c435739 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -602,6 +602,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.enabled";
public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT =
false;
+ public static final String DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY =
+ "dfs.storage.policy.satisfier.queue.limit";
+ public static final int DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT =
+ 1000;
+ public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION =
+ "dfs.storage.policy.satisfier.work.multiplier.per.iteration";
+ public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT =
+ 1;
public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.recheck.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 7465853..570b85d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1422,7 +1422,27 @@ public class DFSUtil {
"It should be a positive, non-zero integer value.");
return blocksReplWorkMultiplier;
}
-
+
+ /**
+ * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
+ * configuration.
+ *
+ * @param conf Configuration
+ * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
+ */
+ public static int getSPSWorkMultiplier(Configuration conf) {
+ int spsWorkMultiplier = conf
+ .getInt(
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+ Preconditions.checkArgument(
+ (spsWorkMultiplier > 0),
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
+ " = '" + spsWorkMultiplier + "' is invalid. " +
+ "It should be a positive, non-zero integer value.");
+ return spsWorkMultiplier;
+ }
+
/**
* Get SPNEGO keytab Key from configuration
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 278b62b..549819f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -101,7 +101,7 @@ public class BlockStorageMovementAttemptedItems {
public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) {
AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
- itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
+ itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
allBlockLocsAttemptedToSatisfy);
storageMovementAttemptedItems.put(itemInfo.getTrackId(),
attemptedItemInfo);
@@ -260,7 +260,7 @@ public class BlockStorageMovementAttemptedItems {
synchronized (storageMovementAttemptedResults) {
if (!isExistInResult(blockCollectionID)) {
ItemInfo candidate = new ItemInfo(
- itemInfo.getRootId(), blockCollectionID);
+ itemInfo.getStartId(), blockCollectionID);
blockStorageMovementNeeded.add(candidate);
iter.remove();
LOG.info("TrackID: {} becomes timed out and moved to needed "
@@ -315,7 +315,7 @@ public class BlockStorageMovementAttemptedItems {
// blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
// the xAttr
ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
- ? attemptedItemInfo.getRootId() : trackId, trackId);
+ ? attemptedItemInfo.getStartId() : trackId, trackId);
switch (status) {
case FAILURE:
if (attemptedItemInfo != null) {
@@ -345,7 +345,7 @@ public class BlockStorageMovementAttemptedItems {
if (attemptedItemInfo != null) {
if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
blockStorageMovementNeeded
- .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
+ .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
LOG.warn("{} But adding trackID back to retry queue as some of"
+ " the blocks couldn't find matching target nodes in"
+ " previous SPS iteration.", msg);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 41a3a6c..788a98b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -29,12 +29,15 @@ import java.util.Map;
import java.util.Queue;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* A Class to track the block collection IDs (Inode's ID) for which physical
* storage movement needed as per the Namespace and StorageReports from DN.
@@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded {
new LinkedList<ItemInfo>();
/**
- * Map of rootId and number of child's. Number of child's indicate the number
- * of files pending to satisfy the policy.
+ * Map of startId and number of child's. Number of child's indicate the
+ * number of files pending to satisfy the policy.
*/
- private final Map<Long, Integer> pendingWorkForDirectory =
- new HashMap<Long, Integer>();
+ private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
+ new HashMap<Long, DirPendingWorkInfo>();
private final Namesystem namesystem;
@@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded {
private final StoragePolicySatisfier sps;
- private Daemon fileInodeIdCollector;
+ private Daemon inodeIdCollector;
+
+ private final int maxQueuedItem;
public BlockStorageMovementNeeded(Namesystem namesystem,
- StoragePolicySatisfier sps) {
+ StoragePolicySatisfier sps, int queueLimit) {
this.namesystem = namesystem;
this.sps = sps;
+ this.maxQueuedItem = queueLimit;
}
/**
@@ -88,15 +94,24 @@ public class BlockStorageMovementNeeded {
/**
* Add the itemInfo to tracking list for which storage movement
* expected if necessary.
- * @param rootId
- * - root inode id
+ * @param startId
+ * - start id
* @param itemInfoList
* - List of child in the directory
*/
- private synchronized void addAll(Long rootId,
- List<ItemInfo> itemInfoList) {
+ @VisibleForTesting
+ public synchronized void addAll(long startId,
+ List<ItemInfo> itemInfoList, boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
- pendingWorkForDirectory.put(rootId, itemInfoList.size());
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ if (pendingWork == null) {
+ pendingWork = new DirPendingWorkInfo();
+ pendingWorkForDirectory.put(startId, pendingWork);
+ }
+ pendingWork.addPendingWorkCount(itemInfoList.size());
+ if (scanCompleted) {
+ pendingWork.markScanCompleted();
+ }
}
/**
@@ -118,6 +133,25 @@ public class BlockStorageMovementNeeded {
}
}
+ /**
+ * Returns queue remaining capacity.
+ */
+ public synchronized int remainingCapacity() {
+ int size = storageMovementNeeded.size();
+ if (size >= maxQueuedItem) {
+ return 0;
+ } else {
+ return (maxQueuedItem - size);
+ }
+ }
+
+ /**
+ * Returns queue size.
+ */
+ public synchronized int size() {
+ return storageMovementNeeded.size();
+ }
+
public synchronized void clearAll() {
spsDirsToBeTraveresed.clear();
storageMovementNeeded.clear();
@@ -131,20 +165,20 @@ public class BlockStorageMovementNeeded {
public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
throws IOException {
if (trackInfo.isDir()) {
- // If track is part of some root then reduce the pending directory work
- // count.
- long rootId = trackInfo.getRootId();
- INode inode = namesystem.getFSDirectory().getInode(rootId);
+ // If track is part of some start inode then reduce the pending
+ // directory work count.
+ long startId = trackInfo.getStartId();
+ INode inode = namesystem.getFSDirectory().getInode(startId);
if (inode == null) {
// directory deleted just remove it.
- this.pendingWorkForDirectory.remove(rootId);
+ this.pendingWorkForDirectory.remove(startId);
} else {
- if (pendingWorkForDirectory.get(rootId) != null) {
- Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
- pendingWorkForDirectory.put(rootId, pendingWork);
- if (pendingWork <= 0) {
- namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
- pendingWorkForDirectory.remove(rootId);
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+ if (pendingWork != null) {
+ pendingWork.decrementPendingWorkCount();
+ if (pendingWork.isDirWorkDone()) {
+ namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(startId);
}
}
}
@@ -161,7 +195,7 @@ public class BlockStorageMovementNeeded {
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
ItemInfo next = iterator.next();
- if (next.getRootId() == trackId) {
+ if (next.getStartId() == trackId) {
iterator.remove();
}
}
@@ -208,7 +242,17 @@ public class BlockStorageMovementNeeded {
* Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
* ID's to process for satisfy the policy.
*/
- private class FileInodeIdCollector implements Runnable {
+ private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
+ implements Runnable {
+
+ private int remainingCapacity = 0;
+
+ private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
+
+ StorageMovementPendingInodeIdCollector(FSDirectory dir) {
+ super(dir);
+ }
+
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
@@ -216,38 +260,36 @@ public class BlockStorageMovementNeeded {
try {
if (!namesystem.isInSafeMode()) {
FSDirectory fsd = namesystem.getFSDirectory();
- Long rootINodeId = spsDirsToBeTraveresed.poll();
- if (rootINodeId == null) {
+ Long startINodeId = spsDirsToBeTraveresed.poll();
+ if (startINodeId == null) {
// Waiting for SPS path
synchronized (spsDirsToBeTraveresed) {
spsDirsToBeTraveresed.wait(5000);
}
} else {
- INode rootInode = fsd.getInode(rootINodeId);
- if (rootInode != null) {
- // TODO : HDFS-12291
- // 1. Implement an efficient recursive directory iteration
- // mechanism and satisfies storage policy for all the files
- // under the given directory.
- // 2. Process files in batches,so datanodes workload can be
- // handled.
- List<ItemInfo> itemInfoList =
- new ArrayList<>();
- for (INode childInode : rootInode.asDirectory()
- .getChildrenList(Snapshot.CURRENT_STATE_ID)) {
- if (childInode.isFile()
- && childInode.asFile().numBlocks() != 0) {
- itemInfoList.add(
- new ItemInfo(rootINodeId, childInode.getId()));
- }
+ INode startInode = fsd.getInode(startINodeId);
+ if (startInode != null) {
+ try {
+ remainingCapacity = remainingCapacity();
+ readLock();
+ traverseDir(startInode.asDirectory(), startINodeId,
+ HdfsFileStatus.EMPTY_NAME,
+ new SPSTraverseInfo(startINodeId));
+ } finally {
+ readUnlock();
}
- if (itemInfoList.isEmpty()) {
- // satisfy track info is empty, so remove the xAttr from the
- // directory
- namesystem.removeXattr(rootINodeId,
+ // Mark startInode traverse is done
+ addAll(startInode.getId(), currentBatch, true);
+ currentBatch.clear();
+
+ // check if directory was empty and no child added to queue
+ DirPendingWorkInfo dirPendingWorkInfo =
+ pendingWorkForDirectory.get(startInode.getId());
+ if (dirPendingWorkInfo.isDirWorkDone()) {
+ namesystem.removeXattr(startInode.getId(),
XATTR_SATISFY_STORAGE_POLICY);
+ pendingWorkForDirectory.remove(startInode.getId());
}
- addAll(rootINodeId, itemInfoList);
}
}
}
@@ -256,17 +298,140 @@ public class BlockStorageMovementNeeded {
}
}
}
+
+ @Override
+ protected void checkPauseForTesting() throws InterruptedException {
+ // TODO implement if needed
+ }
+
+ @Override
+ protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ assert getFSDirectory().hasReadLock();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing {} for statisy the policy",
+ inode.getFullPathName());
+ }
+ if (!inode.isFile()) {
+ return false;
+ }
+ if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+ currentBatch.add(new ItemInfo(
+ ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+ remainingCapacity--;
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean canSubmitCurrentBatch() {
+ return remainingCapacity <= 0;
+ }
+
+ @Override
+ protected void checkINodeReady(long startId) throws IOException {
+ FSNamesystem fsn = ((FSNamesystem) namesystem);
+ fsn.checkNameNodeSafeMode("NN is in safe mode,"
+ + "cannot satisfy the policy.");
+ // SPS work should be cancelled when NN goes to standby. Just
+ // double checking for sanity.
+ fsn.checkOperation(NameNode.OperationCategory.WRITE);
+ }
+
+ @Override
+ protected void submitCurrentBatch(long startId)
+ throws IOException, InterruptedException {
+ // Add current child's to queue
+ addAll(startId, currentBatch, false);
+ currentBatch.clear();
+ }
+
+ @Override
+ protected void throttle() throws InterruptedException {
+ assert !getFSDirectory().hasReadLock();
+ assert !namesystem.hasReadLock();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ + " waiting for some free slots.");
+ }
+ remainingCapacity = remainingCapacity();
+ // wait for queue to be free
+ while (remainingCapacity <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+ }
+ Thread.sleep(5000);
+ remainingCapacity = remainingCapacity();
+ }
+ }
+
+ @Override
+ protected boolean canTraverseDir(INode inode) throws IOException {
+ return true;
+ }
}
- public void start() {
- fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
- fileInodeIdCollector.setName("FileInodeIdCollector");
- fileInodeIdCollector.start();
+ /**
+ * Info for directory recursive scan.
+ */
+ public static class DirPendingWorkInfo {
+
+ private int pendingWorkCount = 0;
+ private boolean fullyScanned = false;
+
+ /**
+ * Increment the pending work count for directory.
+ */
+ public synchronized void addPendingWorkCount(int count) {
+ this.pendingWorkCount = this.pendingWorkCount + count;
+ }
+
+ /**
+ * Decrement the pending work count for directory one track info is
+ * completed.
+ */
+ public synchronized void decrementPendingWorkCount() {
+ this.pendingWorkCount--;
+ }
+
+ /**
+ * Return true if all the pending work is done and directory fully
+ * scanned, otherwise false.
+ */
+ public synchronized boolean isDirWorkDone() {
+ return (pendingWorkCount <= 0 && fullyScanned);
+ }
+
+ /**
+ * Mark directory scan is completed.
+ */
+ public synchronized void markScanCompleted() {
+ this.fullyScanned = true;
+ }
}
- public void stop() {
- if (fileInodeIdCollector != null) {
- fileInodeIdCollector.interrupt();
+ public void init() {
+ inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
+ namesystem.getFSDirectory()));
+ inodeIdCollector.setName("FileInodeIdCollector");
+ inodeIdCollector.start();
+ }
+
+ public void close() {
+ if (inodeIdCollector != null) {
+ inodeIdCollector.interrupt();
+ }
+ }
+
+ class SPSTraverseInfo extends TraverseInfo {
+ private long startId;
+
+ SPSTraverseInfo(long startId) {
+ this.startId = startId;
+ }
+
+ public long getStartId() {
+ return startId;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
new file mode 100644
index 0000000..acc23e5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSTreeTraverser.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * FSTreeTraverser traverse directory recursively and process files
+ * in batches.
+ */
+@InterfaceAudience.Private
+public abstract class FSTreeTraverser {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(FSTreeTraverser.class);
+
+ private FSDirectory dir;
+
+ public FSTreeTraverser(FSDirectory dir) {
+ this.dir = dir;
+ }
+
+ public FSDirectory getFSDirectory() {
+ return dir;
+ }
+
+ /**
+ * Iterate through all files directly inside parent, and recurse down
+ * directories. The listing is done in batch, and can optionally start after
+ * a position. The iteration of the inode tree is done in a depth-first
+ * fashion. But instead of holding all {@link INodeDirectory}'s in memory
+ * on the fly, only the path components to the current inode is held. This
+ * is to reduce memory consumption.
+ *
+ * @param parent
+ * The inode id of parent directory
+ * @param startId
+ * Id of the start inode.
+ * @param startAfter
+ * Full path of a file the traverse should start after.
+ * @param traverseInfo
+ * info which may required for processing the child's.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected void traverseDir(final INodeDirectory parent, final long startId,
+ byte[] startAfter, final TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ List<byte[]> startAfters = new ArrayList<>();
+ if (parent == null) {
+ return;
+ }
+ INode curr = parent;
+ // construct startAfters all the way up to the zone inode.
+ startAfters.add(startAfter);
+ while (curr.getId() != startId) {
+ startAfters.add(0, curr.getLocalNameBytes());
+ curr = curr.getParent();
+ }
+ curr = traverseDirInt(startId, parent, startAfters, traverseInfo);
+ while (!startAfters.isEmpty()) {
+ if (curr == null) {
+ // lock was reacquired, re-resolve path.
+ curr = resolvePaths(startId, startAfters);
+ }
+ curr = traverseDirInt(startId, curr, startAfters, traverseInfo);
+ }
+ }
+
+ /**
+ * Iterates the parent directory, and add direct children files to current
+ * batch. If batch size meets configured threshold, current batch will be
+ * submitted for the processing.
+ * <p>
+ * Locks could be released and reacquired when a batch submission is
+ * finished.
+ *
+ * @param startId
+ * Id of the start inode.
+ * @return The inode which was just processed, if lock is held in the entire
+ * process. Null if lock is released.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected INode traverseDirInt(final long startId, INode curr,
+ List<byte[]> startAfters, final TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ assert dir.hasReadLock();
+ assert dir.getFSNamesystem().hasReadLock();
+ Preconditions.checkNotNull(curr, "Current inode can't be null");
+ checkINodeReady(startId);
+ final INodeDirectory parent = curr.isDirectory() ? curr.asDirectory()
+ : curr.getParent();
+ ReadOnlyList<INode> children = parent
+ .getChildrenList(Snapshot.CURRENT_STATE_ID);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Traversing directory {}", parent.getFullPathName());
+ }
+
+ final byte[] startAfter = startAfters.get(startAfters.size() - 1);
+ boolean lockReleased = false;
+ for (int i = INodeDirectory.nextChild(children, startAfter); i < children
+ .size(); ++i) {
+ final INode inode = children.get(i);
+ if (!processFileInode(inode, traverseInfo)) {
+ // inode wasn't processes. Recurse down if it's a dir,
+ // skip otherwise.
+ if (!inode.isDirectory()) {
+ continue;
+ }
+
+ if (!canTraverseDir(inode)) {
+ continue;
+ }
+ // add 1 level to the depth-first search.
+ curr = inode;
+ if (!startAfters.isEmpty()) {
+ startAfters.remove(startAfters.size() - 1);
+ startAfters.add(curr.getLocalNameBytes());
+ }
+ startAfters.add(HdfsFileStatus.EMPTY_NAME);
+ return lockReleased ? null : curr;
+ }
+ if (canSubmitCurrentBatch()) {
+ final byte[] currentStartAfter = inode.getLocalNameBytes();
+ final String parentPath = parent.getFullPathName();
+ lockReleased = true;
+ readUnlock();
+ submitCurrentBatch(startId);
+ try {
+ throttle();
+ checkPauseForTesting();
+ } finally {
+ readLock();
+ }
+ checkINodeReady(startId);
+
+ // Things could have changed when the lock was released.
+ // Re-resolve the parent inode.
+ FSPermissionChecker pc = dir.getPermissionChecker();
+ INode newParent = dir
+ .resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
+ .getLastINode();
+ if (newParent == null || !newParent.equals(parent)) {
+ // parent dir is deleted or recreated. We're done.
+ return null;
+ }
+ children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
+ // -1 to counter the ++ on the for loop
+ i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
+ }
+ }
+ // Successfully finished this dir, adjust pointers to 1 level up, and
+ // startAfter this dir.
+ startAfters.remove(startAfters.size() - 1);
+ if (!startAfters.isEmpty()) {
+ startAfters.remove(startAfters.size() - 1);
+ startAfters.add(curr.getLocalNameBytes());
+ }
+ curr = curr.getParent();
+ return lockReleased ? null : curr;
+ }
+
+ /**
+ * Resolve the cursor of traverse to an inode.
+ * <p>
+ * The parent of the lowest level startAfter is returned. If somewhere in the
+ * middle of startAfters changed, the parent of the lowest unchanged level is
+ * returned.
+ *
+ * @param startId
+ * Id of the start inode.
+ * @param startAfters
+ * the cursor, represented by a list of path bytes.
+ * @return the parent inode corresponding to the startAfters, or null if the
+ * furthest parent is deleted.
+ */
+ private INode resolvePaths(final long startId, List<byte[]> startAfters)
+ throws IOException {
+ // If the readlock was reacquired, we need to resolve the paths again
+ // in case things have changed. If our cursor file/dir is changed,
+ // continue from the next one.
+ INode zoneNode = dir.getInode(startId);
+ if (zoneNode == null) {
+ throw new FileNotFoundException("Zone " + startId + " is deleted.");
+ }
+ INodeDirectory parent = zoneNode.asDirectory();
+ for (int i = 0; i < startAfters.size(); ++i) {
+ if (i == startAfters.size() - 1) {
+ // last startAfter does not need to be resolved, since search for
+ // nextChild will cover that automatically.
+ break;
+ }
+ INode curr = parent.getChild(startAfters.get(i),
+ Snapshot.CURRENT_STATE_ID);
+ if (curr == null) {
+ // inode at this level has changed. Update startAfters to point to
+ // the next dir at the parent level (and dropping any startAfters
+ // at lower levels).
+ for (; i < startAfters.size(); ++i) {
+ startAfters.remove(startAfters.size() - 1);
+ }
+ break;
+ }
+ parent = curr.asDirectory();
+ }
+ return parent;
+ }
+
+ protected void readLock() {
+ dir.getFSNamesystem().readLock();
+ dir.readLock();
+ }
+
+ protected void readUnlock() {
+ dir.readUnlock();
+ dir.getFSNamesystem().readUnlock("FSTreeTraverser");
+ }
+
+
+ protected abstract void checkPauseForTesting() throws InterruptedException;
+
+ /**
+ * Process an Inode. Add to current batch if it's a file, no-op otherwise.
+ *
+ * @param inode
+ * the inode
+ * @return true if inode is added to currentBatch and should be process for
+ * next operation. false otherwise: could be inode is not a file.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected abstract boolean processFileInode(INode inode,
+ TraverseInfo traverseInfo) throws IOException, InterruptedException;
+
+ /**
+ * Check whether current batch can be submitted for the processing.
+ *
+ * @return true if batch size meets meet the condition, otherwise false.
+ */
+ protected abstract boolean canSubmitCurrentBatch();
+
+ /**
+ * Check whether inode is ready for traverse. Throws IOE if it's not.
+ *
+ * @param startId
+ * Id of the start inode.
+ * @throws IOException
+ */
+ protected abstract void checkINodeReady(long startId) throws IOException;
+
+ /**
+ * Submit the current batch for processing.
+ *
+ * @param startId
+ * Id of the start inode.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected abstract void submitCurrentBatch(long startId)
+ throws IOException, InterruptedException;
+
+ /**
+ * Throttles the FSTreeTraverser.
+ *
+ * @throws InterruptedException
+ */
+ protected abstract void throttle() throws InterruptedException;
+
+ /**
+ * Check whether dir is traversable or not.
+ *
+ * @param inode
+ * Dir inode
+ * @return true if dir is traversable otherwise false.
+ * @throws IOException
+ */
+ protected abstract boolean canTraverseDir(INode inode) throws IOException;
+
+ /**
+ * Class will represent the additional info required for traverse.
+ */
+ public static class TraverseInfo {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index 01c2038..9b00519 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
@@ -30,18 +31,16 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.ReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus.State;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.FileEdekInfo;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ReencryptionTask;
import org.apache.hadoop.hdfs.server.namenode.ReencryptionUpdater.ZoneSubmissionTracker;
-import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
@@ -118,6 +117,8 @@ public class ReencryptionHandler implements Runnable {
// be single-threaded, see class javadoc for more details.
private ReencryptionBatch currentBatch;
+ private ReencryptionPendingInodeIdCollector traverser;
+
private final ReencryptionUpdater reencryptionUpdater;
private ExecutorService updaterExecutor;
@@ -186,16 +187,6 @@ public class ReencryptionHandler implements Runnable {
reencryptionUpdater.pauseForTestingAfterNthCheckpoint(zoneId, count);
}
- private synchronized void checkPauseForTesting() throws InterruptedException {
- assert !dir.hasReadLock();
- assert !dir.getFSNamesystem().hasReadLock();
- while (shouldPauseForTesting) {
- LOG.info("Sleeping in the re-encrypt handler for unit test.");
- wait();
- LOG.info("Continuing re-encrypt handler after pausing.");
- }
- }
-
ReencryptionHandler(final EncryptionZoneManager ezMgr,
final Configuration conf) {
this.ezManager = ezMgr;
@@ -256,6 +247,7 @@ public class ReencryptionHandler implements Runnable {
reencryptionUpdater =
new ReencryptionUpdater(dir, batchService, this, conf);
currentBatch = new ReencryptionBatch(reencryptBatchSize);
+ traverser = new ReencryptionPendingInodeIdCollector(dir, this);
}
ReencryptionStatus getReencryptionStatus() {
@@ -339,7 +331,7 @@ public class ReencryptionHandler implements Runnable {
synchronized (this) {
wait(interval);
}
- checkPauseForTesting();
+ traverser.checkPauseForTesting();
} catch (InterruptedException ie) {
LOG.info("Re-encrypt handler interrupted. Exiting");
Thread.currentThread().interrupt();
@@ -397,7 +389,7 @@ public class ReencryptionHandler implements Runnable {
final INode zoneNode;
final ZoneReencryptionStatus zs;
- readLock();
+ traverser.readLock();
try {
zoneNode = dir.getInode(zoneId);
// start re-encrypting the zone from the beginning
@@ -419,18 +411,19 @@ public class ReencryptionHandler implements Runnable {
zoneId);
if (zs.getLastCheckpointFile() == null) {
// new re-encryption
- reencryptDir(zoneNode.asDirectory(), zoneId, HdfsFileStatus.EMPTY_NAME,
- zs.getEzKeyVersionName());
+ traverser.traverseDir(zoneNode.asDirectory(), zoneId,
+ HdfsFileStatus.EMPTY_NAME,
+ new ZoneTraverseInfo(zs.getEzKeyVersionName()));
} else {
// resuming from a past re-encryption
restoreFromLastProcessedFile(zoneId, zs);
}
// save the last batch and mark complete
- submitCurrentBatch(zoneId);
+ traverser.submitCurrentBatch(zoneId);
LOG.info("Submission completed of zone {} for re-encryption.", zoneId);
reencryptionUpdater.markZoneSubmissionDone(zoneId);
} finally {
- readUnlock();
+ traverser.readUnlock();
}
}
@@ -479,131 +472,8 @@ public class ReencryptionHandler implements Runnable {
dir.getINodesInPath(zs.getLastCheckpointFile(), FSDirectory.DirOp.READ);
parent = lpfIIP.getLastINode().getParent();
startAfter = lpfIIP.getLastINode().getLocalNameBytes();
- reencryptDir(parent, zoneId, startAfter, zs.getEzKeyVersionName());
- }
-
- /**
- * Iterate through all files directly inside parent, and recurse down
- * directories. The listing is done in batch, and can optionally start after
- * a position.
- * <p>
- * Each batch is then send to the threadpool, where KMS will be contacted and
- * edek re-encrypted. {@link ReencryptionUpdater} handles the tasks completed
- * from the threadpool.
- * <p>
- * The iteration of the inode tree is done in a depth-first fashion. But
- * instead of holding all INodeDirectory's in memory on the fly, only the
- * path components to the current inode is held. This is to reduce memory
- * consumption.
- *
- * @param parent The inode id of parent directory
- * @param zoneId Id of the EZ inode
- * @param startAfter Full path of a file the re-encrypt should start after.
- * @throws IOException
- * @throws InterruptedException
- */
- private void reencryptDir(final INodeDirectory parent, final long zoneId,
- byte[] startAfter, final String ezKeyVerName)
- throws IOException, InterruptedException {
- List<byte[]> startAfters = new ArrayList<>();
- if (parent == null) {
- return;
- }
- INode curr = parent;
- // construct startAfters all the way up to the zone inode.
- startAfters.add(startAfter);
- while (curr.getId() != zoneId) {
- startAfters.add(0, curr.getLocalNameBytes());
- curr = curr.getParent();
- }
- curr = reencryptDirInt(zoneId, parent, startAfters, ezKeyVerName);
- while (!startAfters.isEmpty()) {
- if (curr == null) {
- // lock was reacquired, re-resolve path.
- curr = resolvePaths(zoneId, startAfters);
- }
- curr = reencryptDirInt(zoneId, curr, startAfters, ezKeyVerName);
- }
- }
-
- /**
- * Resolve the cursor of re-encryption to an inode.
- * <p>
- * The parent of the lowest level startAfter is returned. If somewhere in the
- * middle of startAfters changed, the parent of the lowest unchanged level is
- * returned.
- *
- * @param zoneId Id of the EZ inode.
- * @param startAfters the cursor, represented by a list of path bytes.
- * @return the parent inode corresponding to the startAfters, or null if
- * the EZ node (furthest parent) is deleted.
- */
- private INode resolvePaths(final long zoneId, List<byte[]> startAfters)
- throws IOException {
- // If the readlock was reacquired, we need to resolve the paths again
- // in case things have changed. If our cursor file/dir is changed,
- // continue from the next one.
- INode zoneNode = dir.getInode(zoneId);
- if (zoneNode == null) {
- throw new FileNotFoundException("Zone " + zoneId + " is deleted.");
- }
- INodeDirectory parent = zoneNode.asDirectory();
- for (int i = 0; i < startAfters.size(); ++i) {
- if (i == startAfters.size() - 1) {
- // last startAfter does not need to be resolved, since search for
- // nextChild will cover that automatically.
- break;
- }
- INode curr =
- parent.getChild(startAfters.get(i), Snapshot.CURRENT_STATE_ID);
- if (curr == null) {
- // inode at this level has changed. Update startAfters to point to
- // the next dir at the parent level (and dropping any startAfters
- // at lower levels).
- for (; i < startAfters.size(); ++i) {
- startAfters.remove(startAfters.size() - 1);
- }
- break;
- }
- parent = curr.asDirectory();
- }
- return parent;
- }
-
- /**
- * Submit the current batch to the thread pool.
- *
- * @param zoneId Id of the EZ INode
- * @throws IOException
- * @throws InterruptedException
- */
- private void submitCurrentBatch(final long zoneId)
- throws IOException, InterruptedException {
- assert dir.hasReadLock();
- if (currentBatch.isEmpty()) {
- return;
- }
- ZoneSubmissionTracker zst;
- synchronized (this) {
- zst = submissions.get(zoneId);
- if (zst == null) {
- zst = new ZoneSubmissionTracker();
- submissions.put(zoneId, zst);
- }
- }
- Future future = batchService
- .submit(new EDEKReencryptCallable(zoneId, currentBatch, this));
- zst.addTask(future);
- LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
- currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
- currentBatch = new ReencryptionBatch(reencryptBatchSize);
- // flip the pause flag if this is nth submission.
- // The actual pause need to happen outside of the lock.
- if (pauseAfterNthSubmission > 0) {
- if (--pauseAfterNthSubmission == 0) {
- shouldPauseForTesting = true;
- }
- }
+ traverser.traverseDir(parent, zoneId, startAfter,
+ new ZoneTraverseInfo(zs.getEzKeyVersionName()));
}
final class ReencryptionBatch {
@@ -711,256 +581,270 @@ public class ReencryptionHandler implements Runnable {
}
}
+
/**
- * Iterates the parent directory, and add direct children files to
- * current batch. If batch size meets configured threshold, a Callable
- * is created and sent to the thread pool, which will communicate to the KMS
- * to get new edeks.
- * <p>
- * Locks could be released and reacquired when a Callable is created.
- *
- * @param zoneId Id of the EZ INode
- * @return The inode which was just processed, if lock is held in the entire
- * process. Null if lock is released.
- * @throws IOException
- * @throws InterruptedException
+ * Called when a new zone is submitted for re-encryption. This will interrupt
+ * the background thread if it's waiting for the next
+ * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
*/
- private INode reencryptDirInt(final long zoneId, INode curr,
- List<byte[]> startAfters, final String ezKeyVerName)
- throws IOException, InterruptedException {
- assert dir.hasReadLock();
- assert dir.getFSNamesystem().hasReadLock();
- Preconditions.checkNotNull(curr, "Current inode can't be null");
- checkZoneReady(zoneId);
- final INodeDirectory parent =
- curr.isDirectory() ? curr.asDirectory() : curr.getParent();
- ReadOnlyList<INode> children =
- parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Re-encrypting directory {}", parent.getFullPathName());
- }
-
- final byte[] startAfter = startAfters.get(startAfters.size() - 1);
- boolean lockReleased = false;
- for (int i = INodeDirectory.nextChild(children, startAfter);
- i < children.size(); ++i) {
- final INode inode = children.get(i);
- if (!reencryptINode(inode, ezKeyVerName)) {
- // inode wasn't added for re-encryption. Recurse down if it's a dir,
- // skip otherwise.
- if (!inode.isDirectory()) {
- continue;
- }
- if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
- // nested EZ, ignore.
- LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
- inode.getFullPathName(), inode.getId());
- continue;
+ synchronized void notifyNewSubmission() {
+ LOG.debug("Notifying handler for new re-encryption command.");
+ this.notify();
+ }
+
+ public ReencryptionPendingInodeIdCollector getTraverser() {
+ return traverser;
+ }
+
+ /**
+ * ReencryptionPendingInodeIdCollector which throttle based on configured
+ * throttle ratio.
+ */
+ class ReencryptionPendingInodeIdCollector extends FSTreeTraverser {
+
+ private ReencryptionHandler reencryptionHandler;
+
+ ReencryptionPendingInodeIdCollector(FSDirectory dir,
+ ReencryptionHandler rHandler) {
+ super(dir);
+ this.reencryptionHandler = rHandler;
+ }
+
+ @Override
+ protected void checkPauseForTesting()
+ throws InterruptedException {
+ assert !dir.hasReadLock();
+ assert !dir.getFSNamesystem().hasReadLock();
+ while (shouldPauseForTesting) {
+ LOG.info("Sleeping in the re-encrypt handler for unit test.");
+ synchronized (reencryptionHandler) {
+ reencryptionHandler.wait(30000);
}
- // add 1 level to the depth-first search.
- curr = inode;
- if (!startAfters.isEmpty()) {
- startAfters.remove(startAfters.size() - 1);
- startAfters.add(curr.getLocalNameBytes());
+ LOG.info("Continuing re-encrypt handler after pausing.");
+ }
+ }
+
+ /**
+ * Process an Inode for re-encryption. Add to current batch if it's a file,
+ * no-op otherwise.
+ *
+ * @param inode
+ * the inode
+ * @return true if inode is added to currentBatch and should be
+ * re-encrypted. false otherwise: could be inode is not a file, or
+ * inode's edek's key version is not changed.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ assert dir.hasReadLock();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
+ }
+ if (!inode.isFile()) {
+ return false;
+ }
+ FileEncryptionInfo feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
+ dir, INodesInPath.fromINode(inode));
+ if (feInfo == null) {
+ LOG.warn("File {} skipped re-encryption because it is not encrypted! "
+ + "This is very likely a bug.", inode.getId());
+ return false;
+ }
+ if (traverseInfo instanceof ZoneTraverseInfo
+ && ((ZoneTraverseInfo) traverseInfo).getEzKeyVerName().equals(
+ feInfo.getEzKeyVersionName())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File {} skipped re-encryption because edek's key version"
+ + " name is not changed.", inode.getFullPathName());
}
- startAfters.add(HdfsFileStatus.EMPTY_NAME);
- return lockReleased ? null : curr;
+ return false;
+ }
+ currentBatch.add(inode.asFile());
+ return true;
+ }
+
+ /**
+ * Check whether zone is ready for re-encryption. Throws IOE if it's not. 1.
+ * If EZ is deleted. 2. if the re-encryption is canceled. 3. If NN is not
+ * active or is in safe mode.
+ *
+ * @throws IOException
+ * if zone does not exist / is cancelled, or if NN is not ready
+ * for write.
+ */
+ @Override
+ protected void checkINodeReady(long zoneId) throws IOException {
+ final ZoneReencryptionStatus zs = getReencryptionStatus().getZoneStatus(
+ zoneId);
+ if (zs == null) {
+ throw new IOException("Zone " + zoneId + " status cannot be found.");
+ }
+ if (zs.isCanceled()) {
+ throw new IOException("Re-encryption is canceled for zone " + zoneId);
}
- if (currentBatch.size() >= reencryptBatchSize) {
- final byte[] currentStartAfter = inode.getLocalNameBytes();
- final String parentPath = parent.getFullPathName();
- submitCurrentBatch(zoneId);
- lockReleased = true;
- readUnlock();
- try {
- throttle();
- checkPauseForTesting();
- } finally {
- readLock();
+ dir.getFSNamesystem().checkNameNodeSafeMode(
+ "NN is in safe mode, cannot re-encrypt.");
+ // re-encryption should be cancelled when NN goes to standby. Just
+ // double checking for sanity.
+ dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
+ }
+
+ /**
+ * Submit the current batch to the thread pool.
+ *
+ * @param zoneId
+ * Id of the EZ INode
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ protected void submitCurrentBatch(final long zoneId) throws IOException,
+ InterruptedException {
+ if (currentBatch.isEmpty()) {
+ return;
+ }
+ ZoneSubmissionTracker zst;
+ synchronized (ReencryptionHandler.this) {
+ zst = submissions.get(zoneId);
+ if (zst == null) {
+ zst = new ZoneSubmissionTracker();
+ submissions.put(zoneId, zst);
}
- checkZoneReady(zoneId);
-
- // Things could have changed when the lock was released.
- // Re-resolve the parent inode.
- FSPermissionChecker pc = dir.getPermissionChecker();
- INode newParent =
- dir.resolvePath(pc, parentPath, FSDirectory.DirOp.READ)
- .getLastINode();
- if (newParent == null || !newParent.equals(parent)) {
- // parent dir is deleted or recreated. We're done.
- return null;
+ }
+ Future future = batchService.submit(new EDEKReencryptCallable(zoneId,
+ currentBatch, reencryptionHandler));
+ zst.addTask(future);
+ LOG.info("Submitted batch (start:{}, size:{}) of zone {} to re-encrypt.",
+ currentBatch.getFirstFilePath(), currentBatch.size(), zoneId);
+ currentBatch = new ReencryptionBatch(reencryptBatchSize);
+ // flip the pause flag if this is nth submission.
+ // The actual pause need to happen outside of the lock.
+ if (pauseAfterNthSubmission > 0) {
+ if (--pauseAfterNthSubmission == 0) {
+ shouldPauseForTesting = true;
}
- children = parent.getChildrenList(Snapshot.CURRENT_STATE_ID);
- // -1 to counter the ++ on the for loop
- i = INodeDirectory.nextChild(children, currentStartAfter) - 1;
}
}
- // Successfully finished this dir, adjust pointers to 1 level up, and
- // startAfter this dir.
- startAfters.remove(startAfters.size() - 1);
- if (!startAfters.isEmpty()) {
- startAfters.remove(startAfters.size() - 1);
- startAfters.add(curr.getLocalNameBytes());
- }
- curr = curr.getParent();
- return lockReleased ? null : curr;
- }
- private void readLock() {
- dir.getFSNamesystem().readLock();
- dir.readLock();
- throttleTimerLocked.start();
- }
+ /**
+ * Throttles the ReencryptionHandler in 3 aspects:
+ * 1. Prevents generating more Callables than the CPU could possibly
+ * handle.
+ * 2. Prevents generating more Callables than the ReencryptionUpdater
+ * can handle, under its own throttling.
+ * 3. Prevents contending FSN/FSD read locks. This is done based
+ * on the DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
+ * <p>
+ * Item 1 and 2 are to control NN heap usage.
+ *
+ * @throws InterruptedException
+ */
+ @VisibleForTesting
+ @Override
+ protected void throttle() throws InterruptedException {
+ assert !dir.hasReadLock();
+ assert !dir.getFSNamesystem().hasReadLock();
+ final int numCores = Runtime.getRuntime().availableProcessors();
+ if (taskQueue.size() >= numCores) {
+ LOG.debug("Re-encryption handler throttling because queue size {} is"
+ + "larger than number of cores {}", taskQueue.size(), numCores);
+ while (taskQueue.size() >= numCores) {
+ Thread.sleep(100);
+ }
+ }
- private void readUnlock() {
- dir.readUnlock();
- dir.getFSNamesystem().readUnlock("reencryptHandler");
- throttleTimerLocked.stop();
- }
+ // 2. if tasks are piling up on the updater, don't create new callables
+ // until the queue size goes down.
+ final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
+ int numTasks = numTasksSubmitted();
+ if (numTasks >= maxTasksPiled) {
+ LOG.debug("Re-encryption handler throttling because total tasks pending"
+ + " re-encryption updater is {}", numTasks);
+ while (numTasks >= maxTasksPiled) {
+ Thread.sleep(500);
+ numTasks = numTasksSubmitted();
+ }
+ }
- /**
- * Throttles the ReencryptionHandler in 3 aspects:
- * 1. Prevents generating more Callables than the CPU could possibly handle.
- * 2. Prevents generating more Callables than the ReencryptionUpdater can
- * handle, under its own throttling
- * 3. Prevents contending FSN/FSD read locks. This is done based on the
- * DFS_NAMENODE_REENCRYPT_THROTTLE_LIMIT_RATIO_KEY configuration.
- * <p>
- * Item 1 and 2 are to control NN heap usage.
- *
- * @throws InterruptedException
- */
- @VisibleForTesting
- void throttle() throws InterruptedException {
- // 1.
- final int numCores = Runtime.getRuntime().availableProcessors();
- if (taskQueue.size() >= numCores) {
- LOG.debug("Re-encryption handler throttling because queue size {} is"
- + "larger than number of cores {}", taskQueue.size(), numCores);
- while (taskQueue.size() >= numCores) {
- Thread.sleep(100);
+ // 3.
+ if (throttleLimitHandlerRatio >= 1.0) {
+ return;
+ }
+ final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
+ * throttleLimitHandlerRatio);
+ final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
+ + " throttleTimerAll:{}", expect, actual,
+ throttleTimerAll.now(TimeUnit.MILLISECONDS));
}
+ if (expect - actual < 0) {
+ // in case throttleLimitHandlerRatio is very small, expect will be 0.
+ // so sleepMs should not be calculated from expect, to really meet the
+ // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
+ // should be 1000 - throttleTimerAll.now()
+ final long sleepMs = (long) (actual / throttleLimitHandlerRatio)
+ - throttleTimerAll.now(TimeUnit.MILLISECONDS);
+ LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
+ Thread.sleep(sleepMs);
+ }
+ throttleTimerAll.reset().start();
+ throttleTimerLocked.reset();
}
- // 2. if tasks are piling up on the updater, don't create new callables
- // until the queue size goes down.
- final int maxTasksPiled = Runtime.getRuntime().availableProcessors() * 2;
- int numTasks = numTasksSubmitted();
- if (numTasks >= maxTasksPiled) {
- LOG.debug("Re-encryption handler throttling because total tasks pending"
- + " re-encryption updater is {}", numTasks);
- while (numTasks >= maxTasksPiled) {
- Thread.sleep(500);
- numTasks = numTasksSubmitted();
+ private int numTasksSubmitted() {
+ int ret = 0;
+ synchronized (ReencryptionHandler.this) {
+ for (ZoneSubmissionTracker zst : submissions.values()) {
+ ret += zst.getTasks().size();
+ }
}
+ return ret;
}
- // 3.
- if (throttleLimitHandlerRatio >= 1.0) {
- return;
- }
- final long expect = (long) (throttleTimerAll.now(TimeUnit.MILLISECONDS)
- * throttleLimitHandlerRatio);
- final long actual = throttleTimerLocked.now(TimeUnit.MILLISECONDS);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Re-encryption handler throttling expect: {}, actual: {},"
- + " throttleTimerAll:{}", expect, actual,
- throttleTimerAll.now(TimeUnit.MILLISECONDS));
- }
- if (expect - actual < 0) {
- // in case throttleLimitHandlerRatio is very small, expect will be 0.
- // so sleepMs should not be calculated from expect, to really meet the
- // ratio. e.g. if ratio is 0.001, expect = 0 and actual = 1, sleepMs
- // should be 1000 - throttleTimerAll.now()
- final long sleepMs =
- (long) (actual / throttleLimitHandlerRatio) - throttleTimerAll
- .now(TimeUnit.MILLISECONDS);
- LOG.debug("Throttling re-encryption, sleeping for {} ms", sleepMs);
- Thread.sleep(sleepMs);
+ @Override
+ public boolean canSubmitCurrentBatch() {
+ return currentBatch.size() >= reencryptBatchSize;
}
- throttleTimerAll.reset().start();
- throttleTimerLocked.reset();
- }
- private synchronized int numTasksSubmitted() {
- int ret = 0;
- for (ZoneSubmissionTracker zst : submissions.values()) {
- ret += zst.getTasks().size();
+ @Override
+ public boolean canTraverseDir(INode inode) throws IOException {
+ if (ezManager.isEncryptionZoneRoot(inode, inode.getFullPathName())) {
+ // nested EZ, ignore.
+ LOG.info("{}({}) is a nested EZ, skipping for re-encryption",
+ inode.getFullPathName(), inode.getId());
+ return false;
+ }
+ return true;
}
- return ret;
- }
- /**
- * Process an Inode for re-encryption. Add to current batch if it's a file,
- * no-op otherwise.
- *
- * @param inode the inode
- * @return true if inode is added to currentBatch and should be re-encrypted.
- * false otherwise: could be inode is not a file, or inode's edek's
- * key version is not changed.
- * @throws IOException
- * @throws InterruptedException
- */
- private boolean reencryptINode(final INode inode, final String ezKeyVerName)
- throws IOException, InterruptedException {
- assert dir.hasReadLock();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing {} for re-encryption", inode.getFullPathName());
- }
- if (!inode.isFile()) {
- return false;
- }
- FileEncryptionInfo feInfo = FSDirEncryptionZoneOp
- .getFileEncryptionInfo(dir, INodesInPath.fromINode(inode));
- if (feInfo == null) {
- LOG.warn("File {} skipped re-encryption because it is not encrypted! "
- + "This is very likely a bug.", inode.getId());
- return false;
+ @Override
+ protected void readLock() {
+ dir.getFSNamesystem().readLock();
+ dir.readLock();
+ throttleTimerLocked.start();
}
- if (ezKeyVerName.equals(feInfo.getEzKeyVersionName())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("File {} skipped re-encryption because edek's key version"
- + " name is not changed.", inode.getFullPathName());
- }
- return false;
+
+ @Override
+ protected void readUnlock() {
+ dir.readUnlock();
+ dir.getFSNamesystem().readUnlock("reencryptHandler");
+ throttleTimerLocked.stop();
}
- currentBatch.add(inode.asFile());
- return true;
}
- /**
- * Check whether zone is ready for re-encryption. Throws IOE if it's not.
- * 1. If EZ is deleted.
- * 2. if the re-encryption is canceled.
- * 3. If NN is not active or is in safe mode.
- *
- * @throws IOException if zone does not exist / is cancelled, or if NN is not
- * ready for write.
- */
- void checkZoneReady(final long zoneId)
- throws RetriableException, SafeModeException, IOException {
- final ZoneReencryptionStatus zs =
- getReencryptionStatus().getZoneStatus(zoneId);
- if (zs == null) {
- throw new IOException("Zone " + zoneId + " status cannot be found.");
- }
- if (zs.isCanceled()) {
- throw new IOException("Re-encryption is canceled for zone " + zoneId);
+ class ZoneTraverseInfo extends TraverseInfo {
+ private String ezKeyVerName;
+
+ ZoneTraverseInfo(String ezKeyVerName) {
+ this.ezKeyVerName = ezKeyVerName;
}
- dir.getFSNamesystem()
- .checkNameNodeSafeMode("NN is in safe mode, cannot re-encrypt.");
- // re-encryption should be cancelled when NN goes to standby. Just
- // double checking for sanity.
- dir.getFSNamesystem().checkOperation(NameNode.OperationCategory.WRITE);
- }
- /**
- * Called when a new zone is submitted for re-encryption. This will interrupt
- * the background thread if it's waiting for the next
- * DFS_NAMENODE_REENCRYPT_SLEEP_INTERVAL_KEY.
- */
- synchronized void notifyNewSubmission() {
- LOG.debug("Notifying handler for new re-encryption command.");
- this.notify();
+ public String getEzKeyVerName() {
+ return ezKeyVerName;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
index 3b7badb..a5923a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionUpdater.java
@@ -464,7 +464,7 @@ public final class ReencryptionUpdater implements Runnable {
final String zonePath;
dir.writeLock();
try {
- handler.checkZoneReady(task.zoneId);
+ handler.getTraverser().checkINodeReady(task.zoneId);
final INode zoneNode = dir.getInode(task.zoneId);
if (zoneNode == null) {
// ez removed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 48d0598..a4372d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable {
private final BlockStorageMovementNeeded storageMovementNeeded;
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
-
+ private int spsWorkMultiplier;
+ private long blockCount = 0L;
/**
* Represents the collective analysis status for all blocks.
*/
@@ -106,7 +107,9 @@ public class StoragePolicySatisfier implements Runnable {
final BlockManager blkManager, Configuration conf) {
this.namesystem = namesystem;
this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
- this);
+ this, conf.getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
this.blockManager = blkManager;
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
conf.getLong(
@@ -117,6 +120,7 @@ public class StoragePolicySatisfier implements Runnable {
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
storageMovementNeeded,
this);
+ this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
}
/**
@@ -143,7 +147,7 @@ public class StoragePolicySatisfier implements Runnable {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
- storageMovementNeeded.start();
+ storageMovementNeeded.init();
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
@@ -164,7 +168,7 @@ public class StoragePolicySatisfier implements Runnable {
return;
}
- storageMovementNeeded.stop();
+ storageMovementNeeded.close();
storagePolicySatisfierThread.interrupt();
this.storageMovementsMonitor.stop();
@@ -268,9 +272,13 @@ public class StoragePolicySatisfier implements Runnable {
}
}
}
- // TODO: We can think to make this as configurable later, how frequently
- // we want to check block movements.
- Thread.sleep(3000);
+ int numLiveDn = namesystem.getFSDirectory().getBlockManager()
+ .getDatanodeManager().getNumLiveDataNodes();
+ if (storageMovementNeeded.size() == 0
+ || blockCount > (numLiveDn * spsWorkMultiplier)) {
+ Thread.sleep(3000);
+ blockCount = 0L;
+ }
} catch (Throwable t) {
handleException(t);
}
@@ -380,6 +388,11 @@ public class StoragePolicySatisfier implements Runnable {
assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
blockMovingInfos, coordinatorNode);
+ int count = 0;
+ for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+ count = count + blkMovingInfo.getSources().length;
+ }
+ blockCount = blockCount + count;
return status;
}
@@ -840,7 +853,7 @@ public class StoragePolicySatisfier implements Runnable {
* - file inode/blockcollection id.
*/
public void satisfyStoragePolicy(Long inodeId) {
- //For file rootId and trackId is same
+ //For file startId and trackId is same
storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
if (LOG.isDebugEnabled()) {
LOG.debug("Added track info for inode {} to block "
@@ -864,19 +877,19 @@ public class StoragePolicySatisfier implements Runnable {
* policy.
*/
public static class ItemInfo {
- private long rootId;
+ private long startId;
private long trackId;
- public ItemInfo(long rootId, long trackId) {
- this.rootId = rootId;
+ public ItemInfo(long startId, long trackId) {
+ this.startId = startId;
this.trackId = trackId;
}
/**
- * Return the root of the current track Id.
+ * Return the start inode id of the current track Id.
*/
- public long getRootId() {
- return rootId;
+ public long getStartId() {
+ return startId;
}
/**
@@ -890,7 +903,7 @@ public class StoragePolicySatisfier implements Runnable {
* Returns true if the tracking path is a directory, false otherwise.
*/
public boolean isDir() {
- return (rootId != trackId);
+ return (startId != trackId);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 48799ea..0298997 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4367,6 +4367,29 @@
</property>
<property>
+ <name>dfs.storage.policy.satisfier.queue.limit</name>
+ <value>1000</value>
+ <description>
+ Storage policy satisfier queue size. This queue contains the currently
+ scheduled file's inode ID for statisfy the policy.
+ Default value is 1000.
+ </description>
+</property>
+
+<property>
+ <name>dfs.storage.policy.satisfier.work.multiplier.per.iteration</name>
+ <value>1</value>
+ <description>
+ *Note*: Advanced property. Change with caution.
+ This determines the total amount of block transfers to begin in
+ one iteration, for satisfy the policy. The actual number is obtained by
+ multiplying this multiplier with the total number of live nodes in the
+ cluster. The result number is the number of blocks to begin transfers
+ immediately. This number can be any positive, non-zero integer.
+ </description>
+</property>
+
+<property>
<name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
<value>300000</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index 87817cf..da61842 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -110,7 +110,7 @@ SPS can be enabled and disabled dynamically without restarting the Namenode.
Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
-* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will consider the files which are immediate to that directory. Sub-directories won't be considered for satisfying the policy. Its user responsibility to call this API on directories recursively, to track all files under the sub tree.
+* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS will scan all sub-directories and consider all the files for satisfy the policy..
* HdfsAdmin API :
`public void satisfyStoragePolicy(final Path path) throws IOException`
@@ -212,7 +212,6 @@ Get the storage policy of a file or a directory.
### Satisfy Storage Policy
Schedule blocks to move based on file's/directory's current storage policy.
-Note: For directory case, it will consider immediate files under that directory and it will not consider sub directories recursively.
* Command:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 55ebf9c..7918821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems {
public void setup() throws Exception {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
Mockito.mock(Namesystem.class),
- Mockito.mock(StoragePolicySatisfier.class));
+ Mockito.mock(StoragePolicySatisfier.class), 100);
StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e7b9148..5bce296 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -191,7 +191,7 @@ public class TestPersistentStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
- childFileName, StorageType.DEFAULT, 3, timeout, fs);
+ childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
@@ -232,7 +232,9 @@ public class TestPersistentStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
- childFileName, StorageType.DEFAULT, 3, timeout, fs);
+ childFileName, StorageType.DISK, 1, timeout, fs);
+ DFSTestUtil.waitExpectedStorageType(
+ childFileName, StorageType.ARCHIVE, 2, timeout, fs);
} finally {
clusterShutdown();
}
@@ -269,7 +271,7 @@ public class TestPersistentStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
DFSTestUtil.waitExpectedStorageType(
- childFileName, StorageType.DEFAULT, 3, timeout, fs);
+ childFileName, StorageType.ARCHIVE, 3, timeout, fs);
} finally {
clusterShutdown();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
index f0d6834..7685f31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryption.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Supplier;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -64,7 +63,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -72,7 +70,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import org.junit.rules.Timeout;
import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
index e2035ed..3481b42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReencryptionHandler.java
@@ -75,6 +75,10 @@ public class TestReencryptionHandler {
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
Mockito.when(ezm.getProvider()).thenReturn(
KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp));
+ FSDirectory fsd = Mockito.mock(FSDirectory.class);
+ FSNamesystem fns = Mockito.mock(FSNamesystem.class);
+ Mockito.when(fsd.getFSNamesystem()).thenReturn(fns);
+ Mockito.when(ezm.getFSDirectory()).thenReturn(fsd);
return new ReencryptionHandler(ezm, conf);
}
@@ -99,7 +103,7 @@ public class TestReencryptionHandler {
Whitebox.setInternalState(rh, "throttleTimerLocked", mockLocked);
Whitebox.setInternalState(rh, "taskQueue", queue);
final StopWatch sw = new StopWatch().start();
- rh.throttle();
+ rh.getTraverser().throttle();
sw.stop();
assertTrue("should have throttled for at least 8 second",
sw.now(TimeUnit.MILLISECONDS) > 8000);
@@ -130,7 +134,7 @@ public class TestReencryptionHandler {
submissions = new HashMap<>();
Whitebox.setInternalState(rh, "submissions", submissions);
StopWatch sw = new StopWatch().start();
- rh.throttle();
+ rh.getTraverser().throttle();
sw.stop();
assertTrue("should not have throttled",
sw.now(TimeUnit.MILLISECONDS) < 1000);
@@ -189,7 +193,7 @@ public class TestReencryptionHandler {
Whitebox.setInternalState(rh, "submissions", submissions);
final StopWatch sw = new StopWatch().start();
removeTaskThread.start();
- rh.throttle();
+ rh.getTraverser().throttle();
sw.stop();
LOG.info("Throttle completed, consumed {}", sw.now(TimeUnit.MILLISECONDS));
assertTrue("should have throttled for at least 3 second",
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org