You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:15 UTC
[22/50] [abbrv] hadoop git commit: HDFS-13025. [SPS]: Implement a
mechanism to scan the files for external SPS. Contributed by Uma Maheswara
Rao G.
HDFS-13025. [SPS]: Implement a mechanism to scan the files for external SPS. Contributed by Uma Maheswara Rao G.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/35063065
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/35063065
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/35063065
Branch: refs/heads/HDFS-10285
Commit: 35063065c0540d1010cc11eab338e6603834aca5
Parents: d8400f7
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Tue Jan 23 20:09:26 2018 +0530
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Thu Aug 9 20:47:22 2018 -0700
----------------------------------------------------------------------
.../sps/BlockStorageMovementNeeded.java | 70 +++-
.../hdfs/server/namenode/sps/Context.java | 8 +
.../IntraSPSNameNodeBlockMoveTaskHandler.java | 2 +
.../namenode/sps/IntraSPSNameNodeContext.java | 7 +
.../sps/IntraSPSNameNodeFileIdCollector.java | 6 +-
.../hdfs/server/namenode/sps/SPSService.java | 10 +-
.../namenode/sps/StoragePolicySatisfier.java | 8 +-
.../server/sps/ExternalSPSFileIDCollector.java | 156 +++++++++
.../hadoop/hdfs/server/sps/package-info.java | 28 ++
.../sps/TestStoragePolicySatisfier.java | 323 ++++++++++---------
.../sps/TestExternalStoragePolicySatisfier.java | 108 +++++++
11 files changed, 556 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 39a0051..b141502 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -97,23 +97,53 @@ public class BlockStorageMovementNeeded {
}
/**
- * Add the itemInfo to tracking list for which storage movement
- * expected if necessary.
+ * Add the itemInfo list to tracking list for which storage movement expected
+ * if necessary.
+ *
* @param startId
- * - start id
+ * - start id
* @param itemInfoList
- * - List of child in the directory
+ * - List of child in the directory
+ * @param scanCompleted
+ * -Indicates whether the start id directory has no more elements to
+ * scan.
*/
@VisibleForTesting
- public synchronized void addAll(long startId,
- List<ItemInfo> itemInfoList, boolean scanCompleted) {
+ public synchronized void addAll(long startId, List<ItemInfo> itemInfoList,
+ boolean scanCompleted) {
storageMovementNeeded.addAll(itemInfoList);
+ updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted);
+ }
+
+ /**
+ * Add the itemInfo to tracking list for which storage movement expected if
+ * necessary.
+ *
+ * @param itemInfoList
+ * - List of child in the directory
+ * @param scanCompleted
+ * -Indicates whether the ItemInfo start id directory has no more
+ * elements to scan.
+ */
+ @VisibleForTesting
+ public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) {
+ storageMovementNeeded.add(itemInfo);
+ // This represents sps start id is file, so no need to update pending dir
+ // stats.
+ if (itemInfo.getStartId() == itemInfo.getFileId()) {
+ return;
+ }
+ updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted);
+ }
+
+ private void updatePendingDirScanStats(long startId, int numScannedFiles,
+ boolean scanCompleted) {
DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
if (pendingWork == null) {
pendingWork = new DirPendingWorkInfo();
pendingWorkForDirectory.put(startId, pendingWork);
}
- pendingWork.addPendingWorkCount(itemInfoList.size());
+ pendingWork.addPendingWorkCount(numScannedFiles);
if (scanCompleted) {
pendingWork.markScanCompleted();
}
@@ -250,13 +280,15 @@ public class BlockStorageMovementNeeded {
@Override
public void run() {
- LOG.info("Starting FileInodeIdCollector!.");
+ LOG.info("Starting SPSPathIdProcessor!.");
long lastStatusCleanTime = 0;
+ Long startINodeId = null;
while (ctxt.isRunning()) {
- LOG.info("Running FileInodeIdCollector!.");
try {
if (!ctxt.isInSafeMode()) {
- Long startINodeId = ctxt.getNextSPSPathId();
+ if (startINodeId == null) {
+ startINodeId = ctxt.getNextSPSPathId();
+ } // else same id will be retried
if (startINodeId == null) {
// Waiting for SPS path
Thread.sleep(3000);
@@ -281,9 +313,18 @@ public class BlockStorageMovementNeeded {
lastStatusCleanTime = Time.monotonicNow();
cleanSpsStatus();
}
+ startINodeId = null; // Current inode id successfully scanned.
}
} catch (Throwable t) {
- LOG.warn("Exception while loading inodes to satisfy the policy", t);
+ String reClass = t.getClass().getName();
+ if (InterruptedException.class.getName().equals(reClass)) {
+ LOG.info("SPSPathIdProcessor thread is interrupted. Stopping..");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ LOG.warn("Exception while scanning file inodes to satisfy the policy",
+ t);
+ // TODO: may be we should retry the current inode id?
}
}
}
@@ -426,4 +467,11 @@ public class BlockStorageMovementNeeded {
public static long getStatusClearanceElapsedTimeMs() {
return statusClearanceElapsedTimeMs;
}
+
+ public void markScanCompletedForDir(Long inodeId) {
+ DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId);
+ if (pendingWork != null) {
+ pendingWork.markScanCompleted();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index b7053b9..f103dfe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -167,4 +167,12 @@ public interface Context {
*/
void removeAllSPSPathIds();
+ /**
+ * Gets the file path for a given inode id.
+ *
+ * @param inodeId
+ * - path inode id.
+ */
+ String getFilePath(Long inodeId);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
index 1da4af9..b27e8c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockM
* This class handles the internal SPS block movements. This will assign block
* movement tasks to target datanode descriptors.
*/
+@InterfaceAudience.Private
public class IntraSPSNameNodeBlockMoveTaskHandler
implements BlockMoveTaskHandler {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index cef26ed..aed684a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SAT
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
* are expecting to change its storages and assigning the block storage
* movements to satisfy the storage policy.
*/
+@InterfaceAudience.Private
public class IntraSPSNameNodeContext implements Context {
private static final Logger LOG = LoggerFactory
.getLogger(IntraSPSNameNodeContext.class);
@@ -195,4 +197,9 @@ public class IntraSPSNameNodeContext implements Context {
public void removeAllSPSPathIds() {
blockManager.removeAllSPSPathIds();
}
+
+ @Override
+ public String getFilePath(Long inodeId) {
+ return namesystem.getFilePath(inodeId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index c6834c1..f7cd754 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
* A specific implementation for scanning the directory with Namenode internal
* Inode structure and collects the file ids under the given directory ID.
*/
+@InterfaceAudience.Private
public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
implements FileIdCollector {
private int maxQueueLimitToScan;
@@ -131,12 +133,12 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
} else {
readLock();
- // NOTE: this lock will not be held until full directory scanning. It is
+ // NOTE: this lock will not be held for full directory scanning. It is
// basically a sliced locking. Once it collects a batch size( at max the
// size of maxQueueLimitToScan (default 1000)) file ids, then it will
// unlock and submits the current batch to SPSService. Once
// service.processingQueueSize() shows empty slots, then lock will be
- // resumed and scan also will be resumed. This logic was re-used from
+ // re-acquired and scan will be resumed. This logic was re-used from
// EDEK feature.
try {
traverseDir(startInode.asDirectory(), startINodeId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index 6d85ea6..d74e391 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -80,7 +80,7 @@ public interface SPSService {
*
* @param itemInfo
*/
- void addFileIdToProcess(ItemInfo itemInfo);
+ void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted);
/**
* Adds all the Item information(file id etc) to processing queue.
@@ -104,4 +104,12 @@ public interface SPSService {
* @return the configuration.
*/
Configuration getConf();
+
+ /**
+ * Marks the scanning of directory if finished.
+ *
+ * @param inodeId
+ * - directory inode id.
+ */
+ void markScanCompletedForPath(Long inodeId);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 28c1372..aafdc65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -563,7 +563,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
chosenTarget.storageType, blockMovingInfos);
}
expected.remove(chosenTarget.storageType);
- // TODO: We can increment scheduled block count for this node?
}
}
// To avoid choosing this excludeNodes as targets later
@@ -924,7 +923,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@Override
- public void addFileIdToProcess(ItemInfo trackInfo) {
+ public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo);
}
@@ -948,4 +947,9 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
public BlockStorageMovementNeeded getStorageMovementQueue() {
return storageMovementNeeded;
}
+
+ @Override
+ public void markScanCompletedForPath(Long inodeId) {
+ getStorageMovementQueue().markScanCompletedForDir(inodeId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
new file mode 100644
index 0000000..597a7d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
@@ -0,0 +1,156 @@
+package org.apache.hadoop.hdfs.server.sps;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
+import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is to scan the paths recursively. If file is directory, then it
+ * will scan for files recursively. If the file is non directory, then it will
+ * just submit the same file to process.
+ */
+@InterfaceAudience.Private
+public class ExternalSPSFileIDCollector implements FileIdCollector {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(ExternalSPSFileIDCollector.class);
+ private Context cxt;
+ private DistributedFileSystem dfs;
+ private SPSService service;
+ private int maxQueueLimitToScan;
+
+ public ExternalSPSFileIDCollector(Context cxt, SPSService service,
+ int batchSize) {
+ this.cxt = cxt;
+ this.service = service;
+ this.maxQueueLimitToScan = service.getConf().getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+ try {
+ // TODO: probably we could get this dfs from external context? but this is
+ // too specific to external.
+ dfs = getFS(service.getConf());
+ } catch (IOException e) {
+ LOG.error("Unable to get the filesystem. Make sure Namenode running and "
+ + "configured namenode address is correct.", e);
+ }
+ }
+
+ private DistributedFileSystem getFS(Configuration conf) throws IOException {
+ return (DistributedFileSystem) FileSystem
+ .get(FileSystem.getDefaultUri(conf), conf);
+ }
+
+ /**
+ * Recursively scan the given path and add the file info to SPS service for
+ * processing.
+ */
+ private void processPath(long startID, String fullPath) {
+ for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
+ final DirectoryListing children;
+ try {
+ children = dfs.getClient().listPaths(fullPath, lastReturnedName, false);
+ } catch (IOException e) {
+ LOG.warn("Failed to list directory " + fullPath
+ + ". Ignore the directory and continue.", e);
+ return;
+ }
+ if (children == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The scanning start dir/sub dir " + fullPath
+ + " does not have childrens.");
+ }
+ return;
+ }
+
+ for (HdfsFileStatus child : children.getPartialListing()) {
+ if (child.isFile()) {
+ service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()),
+ false);
+ checkProcessingQueuesFree();
+ } else {
+ String fullPathStr = child.getFullName(fullPath);
+ if (child.isDirectory()) {
+ if (!fullPathStr.endsWith(Path.SEPARATOR)) {
+ fullPathStr = fullPathStr + Path.SEPARATOR;
+ }
+ processPath(startID, fullPathStr);
+ }
+ }
+ }
+
+ if (children.hasMore()) {
+ lastReturnedName = children.getLastName();
+ } else {
+ return;
+ }
+ }
+ }
+
+ private void checkProcessingQueuesFree() {
+ int remainingCapacity = remainingCapacity();
+ // wait for queue to be free
+ while (remainingCapacity <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ remainingCapacity = remainingCapacity();
+ }
+ }
+
+ /**
+ * Returns queue remaining capacity.
+ */
+ public int remainingCapacity() {
+ int size = service.processingQueueSize();
+ if (size >= maxQueueLimitToScan) {
+ return 0;
+ } else {
+ return (maxQueueLimitToScan - size);
+ }
+ }
+
+ @Override
+ public void scanAndCollectFileIds(Long inodeId) throws IOException {
+ if (dfs == null) {
+ dfs = getFS(service.getConf());
+ }
+ processPath(inodeId, cxt.getFilePath(inodeId));
+ service.markScanCompletedForPath(inodeId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
new file mode 100644
index 0000000..f705df2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides a mechanism for satisfying the storage policy of a
+ * path.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.hdfs.server.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 9354044..e0bf410 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,18 +94,41 @@ public class TestStoragePolicySatisfier {
private static final String COLD = "COLD";
private static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
- private final Configuration config = new HdfsConfiguration();
+ private Configuration config = null;
private StorageType[][] allDiskTypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}};
private MiniDFSCluster hdfsCluster = null;
- final private int numOfDatanodes = 3;
- final private int storagesPerDatanode = 2;
- final private long capacity = 2 * 256 * 1024 * 1024;
- final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
private DistributedFileSystem dfs = null;
- private static final int DEFAULT_BLOCK_SIZE = 1024;
+ public static final int NUM_OF_DATANODES = 3;
+ public static final int STORAGES_PER_DATANODE = 2;
+ public static final long CAPACITY = 2 * 256 * 1024 * 1024;
+ public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying";
+ public static final int DEFAULT_BLOCK_SIZE = 1024;
+
+ /**
+ * Sets hdfs cluster.
+ */
+ public void setCluster(MiniDFSCluster cluster) {
+ this.hdfsCluster = cluster;
+ }
+
+ /**
+ * @return conf.
+ */
+ public Configuration getConf() {
+ return this.config;
+ }
+
+ /**
+ * Gets distributed file system.
+ *
+ * @throws IOException
+ */
+ public void getFS() throws IOException {
+ this.dfs = hdfsCluster.getFileSystem();
+ }
@After
public void shutdownCluster() {
@@ -113,14 +137,19 @@ public class TestStoragePolicySatisfier {
}
}
- private void createCluster() throws IOException {
+ public void createCluster() throws IOException {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
- hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
- storagesPerDatanode, capacity);
- dfs = hdfsCluster.getFileSystem();
- writeContent(file);
+ hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES,
+ STORAGES_PER_DATANODE, CAPACITY);
+ getFS();
+ writeContent(FILE);
+ }
+
+ @Before
+ public void setUp() {
+ config = new HdfsConfiguration();
}
@Test(timeout = 300000)
@@ -137,19 +166,19 @@ public class TestStoragePolicySatisfier {
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
// Change policy to COLD
- dfs.setStoragePolicy(new Path(file), COLD);
+ dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
- startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsCluster.triggerHeartbeats();
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
// Wait till namenode notified about the block location details
- DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000,
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000,
dfs);
}
@@ -159,7 +188,7 @@ public class TestStoragePolicySatisfier {
try {
createCluster();
// Change policy to ALL_SSD
- dfs.setStoragePolicy(new Path(file), "ALL_SSD");
+ dfs.setStoragePolicy(new Path(FILE), "ALL_SSD");
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK},
@@ -168,14 +197,13 @@ public class TestStoragePolicySatisfier {
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
- startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
- dfs.satisfyStoragePolicy(new Path(file));
+ startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.SSD, 3, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, dfs);
} finally {
shutdownCluster();
}
@@ -187,23 +215,22 @@ public class TestStoragePolicySatisfier {
try {
createCluster();
// Change policy to ONE_SSD
- dfs.setStoragePolicy(new Path(file), ONE_SSD);
+ dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
- startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
- dfs.satisfyStoragePolicy(new Path(file));
+ startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier Identified that block to move to SSD
// areas
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.SSD, 1, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 2, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+ dfs);
} finally {
shutdownCluster();
}
@@ -218,23 +245,22 @@ public class TestStoragePolicySatisfier {
try {
createCluster();
// Change policy to ONE_SSD
- dfs.setStoragePolicy(new Path(file), ONE_SSD);
+ dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
- startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
- dfs.satisfyStoragePolicy(new Path(file));
+ startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till the block is moved to SSD areas
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.SSD, 1, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 2, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+ dfs);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
@@ -251,7 +277,7 @@ public class TestStoragePolicySatisfier {
try {
createCluster();
List<String> files = new ArrayList<>();
- files.add(file);
+ files.add(FILE);
// Creates 4 more files. Send all of them for satisfying the storage
// policy together.
@@ -271,8 +297,8 @@ public class TestStoragePolicySatisfier {
// Making sure SDD based nodes added to cluster. Adding SSD based
// datanodes.
- startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsCluster.triggerHeartbeats();
for (String fileName : files) {
@@ -300,21 +326,21 @@ public class TestStoragePolicySatisfier {
HdfsAdmin hdfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
// Change policy to COLD
- dfs.setStoragePolicy(new Path(file), COLD);
+ dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}};
- startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
- hdfsAdmin.satisfyStoragePolicy(new Path(file));
+ hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till namenode notified about the block location details
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.ARCHIVE, 3, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000,
+ dfs);
} finally {
shutdownCluster();
}
@@ -344,8 +370,8 @@ public class TestStoragePolicySatisfier {
StorageType[][] newtypes =
new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
- startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
hdfsAdmin.satisfyStoragePolicy(new Path(subDir));
@@ -384,11 +410,11 @@ public class TestStoragePolicySatisfier {
new HdfsAdmin(FileSystem.getDefaultUri(config), config);
try {
- hdfsAdmin.satisfyStoragePolicy(new Path(file));
+ hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
Assert.fail(String.format(
"Should failed to satisfy storage policy "
+ "for %s since %s is set to false.",
- file, DFS_STORAGE_POLICY_ENABLED_KEY));
+ FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(String.format(
"Failed to satisfy storage policy since %s is set to false.",
@@ -409,17 +435,17 @@ public class TestStoragePolicySatisfier {
}
try {
- hdfsAdmin.satisfyStoragePolicy(new Path(file));
- hdfsAdmin.satisfyStoragePolicy(new Path(file));
- Assert.fail(String.format(
- "Should failed to satisfy storage policy "
- + "for %s ,since it has been "
- + "added to satisfy movement queue.", file));
+ hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+ hdfsAdmin.satisfyStoragePolicy(new Path(FILE));
+ Assert.fail(String.format("Should failed to satisfy storage policy "
+ + "for %s ,since it has been " + "added to satisfy movement queue.",
+ FILE));
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
String.format("Cannot request to call satisfy storage policy "
+ "on path %s, as this file/dir was already called for "
- + "satisfying storage policy.", file), e);
+ + "satisfying storage policy.", FILE),
+ e);
}
} finally {
shutdownCluster();
@@ -446,23 +472,23 @@ public class TestStoragePolicySatisfier {
try {
createCluster();
// Change policy to COLD
- dfs.setStoragePolicy(new Path(file), COLD);
+ dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
// Adding ARCHIVE based datanodes.
- startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.ARCHIVE, 1, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 2, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
+ dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+ dfs);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
@@ -489,22 +515,22 @@ public class TestStoragePolicySatisfier {
try {
createCluster();
// Change policy to COLD
- dfs.setStoragePolicy(new Path(file), COLD);
+ dfs.setStoragePolicy(new Path(FILE), COLD);
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
// Adding DISK based datanodes
- startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// No block movement will be scheduled as there is no target node
// available with the required storage type.
waitForAttemptedItems(1, 30000);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 3, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+ dfs);
// Since there is no target node the item will get timed out and then
// re-attempted.
waitForAttemptedItems(1, 30000);
@@ -628,8 +654,8 @@ public class TestStoragePolicySatisfier {
{StorageType.ARCHIVE, StorageType.ARCHIVE},
{StorageType.ARCHIVE, StorageType.ARCHIVE}};
// Adding DISK based datanodes
- startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
dfs.satisfyStoragePolicy(new Path(file1));
hdfsCluster.triggerHeartbeats();
@@ -682,21 +708,21 @@ public class TestStoragePolicySatisfier {
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.ARCHIVE}};
hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
- writeContent(file, (short) 5);
+ writeContent(FILE, (short) 5);
// Change policy to COLD
- dfs.setStoragePolicy(new Path(file), COLD);
+ dfs.setStoragePolicy(new Path(FILE), COLD);
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
// Wait till StorgePolicySatisfier identified that block to move to
// ARCHIVE area.
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.ARCHIVE, 2, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 3, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
+ dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+ dfs);
waitForBlocksMovementAttemptReport(1, 30000);
} finally {
@@ -720,20 +746,19 @@ public class TestStoragePolicySatisfier {
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
try {
- hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
- storagesPerDatanode, capacity);
+ hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
- writeContent(file);
+ writeContent(FILE);
// Change policy to ONE_SSD
- dfs.setStoragePolicy(new Path(file), ONE_SSD);
+ dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.SSD, 1, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 2, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+ dfs);
} finally {
shutdownCluster();
@@ -760,19 +785,19 @@ public class TestStoragePolicySatisfier {
true);
try {
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
- writeContent(file);
+ writeContent(FILE);
// Change policy to WARM
- dfs.setStoragePolicy(new Path(file), "WARM");
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.setStoragePolicy(new Path(FILE), "WARM");
+ dfs.satisfyStoragePolicy(new Path(FILE));
hdfsCluster.triggerHeartbeats();
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.DISK, 1, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(
- file, StorageType.ARCHIVE, 2, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000,
+ dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000,
+ dfs);
} finally {
shutdownCluster();
}
@@ -794,31 +819,31 @@ public class TestStoragePolicySatisfier {
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
// 1. Write two replica on disk
- DFSTestUtil.createFile(dfs, new Path(file), DEFAULT_BLOCK_SIZE,
+ DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE,
(short) 2, 0);
// 2. Change policy to COLD, so third replica will be written to ARCHIVE.
- dfs.setStoragePolicy(new Path(file), "COLD");
+ dfs.setStoragePolicy(new Path(FILE), "COLD");
// 3.Change replication factor to 3.
- dfs.setReplication(new Path(file), (short) 3);
+ dfs.setReplication(new Path(FILE), (short) 3);
- DFSTestUtil
- .waitExpectedStorageType(file, StorageType.DISK, 2, 30000, dfs);
- DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000,
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000,
+ dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000,
dfs);
// 4. Change policy to HOT, so we can move the all block to DISK.
- dfs.setStoragePolicy(new Path(file), "HOT");
+ dfs.setStoragePolicy(new Path(FILE), "HOT");
// 4. Satisfy the policy.
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
// 5. Block should move successfully .
- DFSTestUtil
- .waitExpectedStorageType(file, StorageType.DISK, 3, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
+ dfs);
} finally {
shutdownCluster();
}
@@ -840,13 +865,13 @@ public class TestStoragePolicySatisfier {
true);
long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1);
try {
- hdfsCluster = startCluster(config, diskTypes, numOfDatanodes,
- storagesPerDatanode, dnCapacity);
+ hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES,
+ STORAGES_PER_DATANODE, dnCapacity);
dfs = hdfsCluster.getFileSystem();
- writeContent(file);
+ writeContent(FILE);
// Change policy to ONE_SSD
- dfs.setStoragePolicy(new Path(file), ONE_SSD);
+ dfs.setStoragePolicy(new Path(FILE), ONE_SSD);
Path filePath = new Path("/testChooseInSameDatanode");
final FSDataOutputStream out =
dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE);
@@ -869,7 +894,7 @@ public class TestStoragePolicySatisfier {
for (DataNode dataNode : dataNodes) {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true);
}
- dfs.satisfyStoragePolicy(new Path(file));
+ dfs.satisfyStoragePolicy(new Path(FILE));
// Wait for items to be processed
waitForAttemptedItems(1, 30000);
@@ -887,9 +912,9 @@ public class TestStoragePolicySatisfier {
}
hdfsCluster.triggerHeartbeats();
- DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000,
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000,
dfs);
- DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs);
+ DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, dfs);
} finally {
shutdownCluster();
}
@@ -928,7 +953,7 @@ public class TestStoragePolicySatisfier {
true);
try {
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
dfs.enableErasureCodingPolicy(
StripedFileTestUtil.getDefaultECPolicy().getName());
@@ -1029,8 +1054,7 @@ public class TestStoragePolicySatisfier {
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK},
{StorageType.ARCHIVE, StorageType.DISK}};
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
- .storageTypes(newtypes).build();
+ cluster = startCluster(conf, newtypes, 3, 2, CAPACITY);
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
Path filePath = new Path("/zeroSizeFile");
@@ -1211,7 +1235,7 @@ public class TestStoragePolicySatisfier {
config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
true);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
@@ -1245,7 +1269,7 @@ public class TestStoragePolicySatisfier {
config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
5);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
List<String> files = getDFSListOfTree();
@@ -1284,7 +1308,7 @@ public class TestStoragePolicySatisfier {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
@@ -1312,8 +1336,7 @@ public class TestStoragePolicySatisfier {
}
};
- FileIdCollector fileIDCollector =
- new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+ FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null);
sps.getStorageMovementQueue().activate();
@@ -1323,31 +1346,20 @@ public class TestStoragePolicySatisfier {
//Wait for thread to reach U.
Thread.sleep(1000);
-
dfs.delete(new Path("/root/D/L"), true);
- // Remove 10 element and make queue free, So other traversing will start.
- for (int i = 0; i < 10; i++) {
- String path = expectedTraverseOrder.remove(0);
- long trackId = sps.getStorageMovementQueue().get().getFileId();
- INode inode = fsDir.getInode(trackId);
- assertTrue("Failed to traverse tree, expected " + path + " but got "
- + inode.getFullPathName(), path.equals(inode.getFullPathName()));
- }
- //Wait to finish tree traverse
- Thread.sleep(5000);
- // Check other element traversed in order and R,S should not be added in
- // queue which we already removed from expected list
- for (String path : expectedTraverseOrder) {
- long trackId = sps.getStorageMovementQueue().get().getFileId();
- INode inode = fsDir.getInode(trackId);
- assertTrue("Failed to traverse tree, expected " + path + " but got "
- + inode.getFullPathName(), path.equals(inode.getFullPathName()));
- }
+ assertTraversal(expectedTraverseOrder, fsDir, sps);
dfs.delete(new Path("/root"), true);
}
+ public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
+ Context ctxt) {
+ FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector(
+ hdfsCluster.getNamesystem().getFSDirectory(), sps);
+ return fileIDCollector;
+ }
+
/**
* Test traverse when root parent got deleted.
* 1. Delete L when traversing Q
@@ -1362,7 +1374,7 @@ public class TestStoragePolicySatisfier {
config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10);
hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
- storagesPerDatanode, capacity);
+ STORAGES_PER_DATANODE, CAPACITY);
dfs = hdfsCluster.getFileSystem();
createDirectoryTree(dfs);
@@ -1378,7 +1390,6 @@ public class TestStoragePolicySatisfier {
// Queue limit can control the traverse logic to wait for some free
// entry in queue. After 10 files, traverse control will be on U.
- // StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
StoragePolicySatisfier sps = new StoragePolicySatisfier(config);
Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(),
hdfsCluster.getNamesystem().getBlockManager(), sps) {
@@ -1392,9 +1403,7 @@ public class TestStoragePolicySatisfier {
return true;
}
};
-
- FileIdCollector fileIDCollector =
- new IntraSPSNameNodeFileIdCollector(fsDir, sps);
+ FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null);
sps.getStorageMovementQueue().activate();
@@ -1407,6 +1416,13 @@ public class TestStoragePolicySatisfier {
dfs.delete(new Path("/root/D/L"), true);
+ assertTraversal(expectedTraverseOrder, fsDir, sps);
+ dfs.delete(new Path("/root"), true);
+ }
+
+ private void assertTraversal(List<String> expectedTraverseOrder,
+ FSDirectory fsDir, StoragePolicySatisfier sps)
+ throws InterruptedException {
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
@@ -1426,7 +1442,6 @@ public class TestStoragePolicySatisfier {
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
}
- dfs.delete(new Path("/root"), true);
}
/**
@@ -1473,8 +1488,8 @@ public class TestStoragePolicySatisfier {
StorageType[][] newtypes =
new StorageType[][]{{StorageType.DISK, StorageType.SSD},
{StorageType.DISK, StorageType.SSD}};
- startAdditionalDNs(config, 2, numOfDatanodes, newtypes,
- storagesPerDatanode, capacity, hdfsCluster);
+ startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes,
+ STORAGES_PER_DATANODE, CAPACITY, hdfsCluster);
// increase replication factor to 4 for the first 10 files and thus
// initiate replica tasks
@@ -1772,7 +1787,7 @@ public class TestStoragePolicySatisfier {
}, 100, timeout);
}
- private void writeContent(final String fileName) throws IOException {
+ public void writeContent(final String fileName) throws IOException {
writeContent(fileName, (short) 3);
}
@@ -1805,7 +1820,7 @@ public class TestStoragePolicySatisfier {
cluster.triggerHeartbeats();
}
- private MiniDFSCluster startCluster(final Configuration conf,
+ public MiniDFSCluster startCluster(final Configuration conf,
StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
long nodeCapacity) throws IOException {
long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/35063065/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
new file mode 100644
index 0000000..3ced34e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
+import org.junit.Ignore;
+
+/**
+ * Tests the external sps service plugins.
+ */
+public class TestExternalStoragePolicySatisfier
+ extends TestStoragePolicySatisfier {
+ private StorageType[][] allDiskTypes =
+ new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK}};
+
+ @Override
+ public void createCluster() throws IOException {
+ getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+ getConf().setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ true);
+ setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES,
+ STORAGES_PER_DATANODE, CAPACITY));
+ getFS();
+ writeContent(FILE);
+ }
+
+ @Override
+ public MiniDFSCluster startCluster(final Configuration conf,
+ StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
+ long nodeCapacity) throws IOException {
+ long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
+ for (int i = 0; i < numberOfDatanodes; i++) {
+ for (int j = 0; j < storagesPerDn; j++) {
+ capacities[i][j] = nodeCapacity;
+ }
+ }
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
+ .storageTypes(storageTypes).storageCapacities(capacities).build();
+ cluster.waitActive();
+ if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+ false)) {
+ SPSService spsService = cluster.getNameNode().getNamesystem()
+ .getBlockManager().getSPSService();
+ spsService.stopGracefully();
+
+ IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
+ cluster.getNameNode().getNamesystem(),
+ cluster.getNameNode().getNamesystem().getBlockManager(), cluster
+ .getNameNode().getNamesystem().getBlockManager().getSPSService());
+
+ spsService.init(context,
+ new ExternalSPSFileIDCollector(context,
+ cluster.getNameNode().getNamesystem().getBlockManager()
+ .getSPSService(),
+ 5),
+ new IntraSPSNameNodeBlockMoveTaskHandler(
+ cluster.getNameNode().getNamesystem().getBlockManager(),
+ cluster.getNameNode().getNamesystem()));
+ spsService.start(true);
+ }
+ return cluster;
+ }
+
+ @Override
+ public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps,
+ Context ctxt) {
+ return new ExternalSPSFileIDCollector(ctxt, sps, 5);
+ }
+
+ /**
+ * This test need not run as external scan is not a batch based scanning right
+ * now.
+ */
+ @Ignore("ExternalFileIdCollector is not batch based right now."
+ + " So, ignoring it.")
+ public void testBatchProcessingForSPSDirectory() throws Exception {
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org