You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/12 10:22:19 UTC
[36/50] [abbrv] hadoop git commit: HDFS-13050: [SPS]: Create
start/stop script to start external SPS process. Contributed by Surendra
Singh Lilhore.
HDFS-13050: [SPS]: Create start/stop script to start external SPS process. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5845c36c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5845c36c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5845c36c
Branch: refs/heads/HDFS-10285
Commit: 5845c36c16c423107183287cce3be9357dad7564
Parents: 99594b4
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Mon Jan 29 03:10:48 2018 +0530
Committer: Uma Maheswara Rao Gangumalla <um...@apache.org>
Committed: Sun Aug 12 03:06:04 2018 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/src/main/bin/hdfs | 5 +
.../server/blockmanagement/BlockManager.java | 9 ++
.../apache/hadoop/hdfs/server/mover/Mover.java | 2 +-
.../hdfs/server/namenode/sps/Context.java | 5 -
.../namenode/sps/IntraSPSNameNodeContext.java | 4 -
.../sps/IntraSPSNameNodeFileIdCollector.java | 12 +-
.../hdfs/server/namenode/sps/SPSPathIds.java | 1 +
.../namenode/sps/StoragePolicySatisfier.java | 83 +++++++-----
.../sps/ExternalSPSBlockMoveTaskHandler.java | 2 +-
.../hdfs/server/sps/ExternalSPSContext.java | 57 +-------
.../server/sps/ExternalSPSFileIDCollector.java | 12 +-
.../sps/ExternalStoragePolicySatisfier.java | 130 +++++++++++++++++++
.../src/site/markdown/ArchivalStorage.md | 10 +-
.../sps/TestStoragePolicySatisfier.java | 22 ++--
.../sps/TestExternalStoragePolicySatisfier.java | 33 +++--
15 files changed, 259 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index bc6e7a4..94426a5 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -63,6 +63,7 @@ function hadoop_usage
hadoop_add_subcommand "secondarynamenode" daemon "run the DFS secondary namenode"
hadoop_add_subcommand "snapshotDiff" client "diff two snapshots of a directory or diff the current directory contents with a snapshot"
hadoop_add_subcommand "storagepolicies" admin "list/get/set/satisfyStoragePolicy block storage policies"
+ hadoop_add_subcommand "sps" daemon "run external storagepolicysatisfier"
hadoop_add_subcommand "version" client "print the version"
hadoop_add_subcommand "zkfc" daemon "run the ZK Failover Controller daemon"
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" false
@@ -201,6 +202,10 @@ function hdfscmd_case
storagepolicies)
HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.StoragePolicyAdmin
;;
+ sps)
+ HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
+ HADOOP_CLASSNAME=org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier
+ ;;
version)
HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
;;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ac6d44b..4ea64a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -94,6 +94,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
@@ -5106,9 +5109,15 @@ public class BlockManager implements BlockStatsMXBean {
return;
}
updateSPSMode(StoragePolicySatisfierMode.INTERNAL);
+ sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps),
+ new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
+ sps),
+ new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null);
sps.start(true, spsMode);
}
+
+
/**
* Enable storage policy satisfier by starting its service.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index b4e9716..2cc0e27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -672,7 +672,7 @@ public class Mover {
}
if (spsRunning) {
System.err.println("Mover failed due to StoragePolicySatisfier"
- + " is running. Exiting with status "
+ + " service running inside namenode. Exiting with status "
+ ExitStatus.SKIPPED_DUE_TO_SPS + "... ");
return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index bddbc1b..ff4ad6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -175,9 +175,4 @@ public interface Context {
*/
String getFilePath(Long inodeId);
- /**
- * Close the resources.
- */
- void close() throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index 191886c..ff6cc21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -196,8 +196,4 @@ public class IntraSPSNameNodeContext implements Context {
return namesystem.getFilePath(inodeId);
}
- @Override
- public void close() throws IOException {
- // Nothing to clean.
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index f7cd754..7a44dd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -158,11 +158,15 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
*/
public synchronized int remainingCapacity() {
int size = service.processingQueueSize();
- if (size >= maxQueueLimitToScan) {
- return 0;
- } else {
- return (maxQueueLimitToScan - size);
+ int remainingSize = 0;
+ if (size < maxQueueLimitToScan) {
+ remainingSize = maxQueueLimitToScan - size;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+ + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
+ }
+ return remainingSize;
}
class SPSTraverseInfo extends TraverseInfo {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
index cd6ad22..e0f4999 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability;
public class SPSPathIds {
// List of pending dir to satisfy the policy
+ // TODO: Make this bounded queue.
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 89799fc..4ddfe2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -174,10 +175,11 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
return;
}
if (reconfigStart) {
- LOG.info("Starting StoragePolicySatisfier, as admin requested to "
- + "start it.");
+ LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
+ + "start it.", StringUtils.toLowerCase(spsMode.toString()));
} else {
- LOG.info("Starting StoragePolicySatisfier.");
+ LOG.info("Starting {} StoragePolicySatisfier.",
+ StringUtils.toLowerCase(spsMode.toString()));
}
// Ensure that all the previously submitted block movements(if any) have to
@@ -243,7 +245,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
@Override
public void run() {
- while (ctxt.isRunning()) {
+ while (isRunning) {
+ // Check if dependent service is running
+ if (!ctxt.isRunning()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Upstream service is down, skipping the sps work.");
+ }
+ continue;
+ }
try {
if (!ctxt.isInSafeMode()) {
ItemInfo itemInfo = storageMovementNeeded.get();
@@ -284,33 +293,39 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
// Just add to monitor, so it will be tracked for report and
// be removed on storage movement attempt finished report.
case BLOCKS_TARGETS_PAIRED:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block analysis status:{} for the file path:{}."
+ + " Adding to attempt monitor queue for the storage "
+ + "movement attempt finished report",
+ status.status, fileStatus.getPath());
+ }
this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
.getStartId(), itemInfo.getFileId(), monotonicNow(),
status.assignedBlocks, itemInfo.getRetryCount()));
break;
case NO_BLOCKS_TARGETS_PAIRED:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID " + trackId
- + " back to retry queue as none of the blocks"
- + " found its eligible targets.");
+ LOG.debug("Adding trackID:{} for the file path:{} back to"
+ + " retry queue as none of the blocks found its eligible"
+ + " targets.", trackId, fileStatus.getPath());
}
itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID " + trackId
- + " back to retry queue as some of the blocks"
- + " are low redundant.");
+ LOG.debug("Adding trackID:{} for the file path:{} back to "
+ + "retry queue as some of the blocks are low redundant.",
+ trackId, fileStatus.getPath());
}
itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case BLOCKS_FAILED_TO_MOVE:
if (LOG.isDebugEnabled()) {
- LOG.debug("Adding trackID " + trackId
- + " back to retry queue as some of the blocks"
- + " movement failed.");
+ LOG.debug("Adding trackID:{} for the file path:{} back to "
+ + "retry queue as some of the blocks movement failed.",
+ trackId, fileStatus.getPath());
}
this.storageMovementNeeded.add(itemInfo);
break;
@@ -318,8 +333,9 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
case BLOCKS_TARGET_PAIRING_SKIPPED:
case BLOCKS_ALREADY_SATISFIED:
default:
- LOG.info("Block analysis skipped or blocks already satisfied"
- + " with storages. So, Cleaning up the Xattrs.");
+ LOG.info("Block analysis status:{} for the file path:{}."
+ + " So, Cleaning up the Xattrs.", status.status,
+ fileStatus.getPath());
storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
break;
}
@@ -346,20 +362,20 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
if (isRunning) {
synchronized (this) {
if (isRunning) {
- isRunning = false;
- // Stopping monitor thread and clearing queues as well
- this.clearQueues();
- this.storageMovementsMonitor.stopGracefully();
- if (!(t instanceof InterruptedException)) {
- LOG.info("StoragePolicySatisfier received an exception"
- + " while shutting down.", t);
+ if (t instanceof InterruptedException) {
+ isRunning = false;
+ LOG.info("Stopping StoragePolicySatisfier.");
+ // Stopping monitor thread and clearing queues as well
+ this.clearQueues();
+ this.storageMovementsMonitor.stopGracefully();
+ } else {
+ LOG.error(
+ "StoragePolicySatisfier thread received runtime exception, "
+ + "ignoring", t);
}
- LOG.info("Stopping StoragePolicySatisfier.");
}
}
}
- LOG.error("StoragePolicySatisfier thread received runtime exception. "
- + "Stopping Storage policy satisfier work", t);
return;
}
@@ -374,9 +390,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
if (!lastBlkComplete) {
// Postpone, currently file is under construction
- // So, should we add back? or leave it to user
- LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
- + " this to the next retry iteration", fileInfo.getFileId());
+ LOG.info("File: {} is under construction. So, postpone"
+ + " this to the next retry iteration", fileInfo.getPath());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
new ArrayList<>());
@@ -384,8 +399,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
if (blocks.size() == 0) {
- LOG.info("BlockCollectionID: {} file is not having any blocks."
- + " So, skipping the analysis.", fileInfo.getFileId());
+ LOG.info("File: {} is not having any blocks."
+ + " So, skipping the analysis.", fileInfo.getPath());
return new BlocksMovingAnalysis(
BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
new ArrayList<>());
@@ -970,4 +985,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
public void markScanCompletedForPath(Long inodeId) {
getStorageMovementQueue().markScanCompletedForDir(inodeId);
}
+
+ /**
+ * Join main SPS thread.
+ */
+ public void join() throws InterruptedException {
+ //TODO Add join here on SPS rpc server also
+ storagePolicySatisfierThread.join();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index a1c8eec..4a762649 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -110,7 +110,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
/**
* Initializes block movement tracker daemon and starts the thread.
*/
- void init() {
+ public void init() {
movementTrackerThread = new Daemon(this.blkMovementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
movementTrackerThread.start();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index e5b04ba..e3b3bbb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -19,19 +19,13 @@
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -57,13 +51,12 @@ public class ExternalSPSContext implements Context {
LoggerFactory.getLogger(ExternalSPSContext.class);
private SPSService service;
private NameNodeConnector nnc = null;
- private Object nnConnectionLock = new Object();
private BlockStoragePolicySuite createDefaultSuite =
BlockStoragePolicySuite.createDefaultSuite();
- public ExternalSPSContext(SPSService service) {
+ public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
this.service = service;
- initializeNamenodeConnector();
+ this.nnc = nnc;
}
@Override
@@ -73,7 +66,6 @@ public class ExternalSPSContext implements Context {
@Override
public boolean isInSafeMode() {
- initializeNamenodeConnector();
try {
return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode()
: false;
@@ -85,7 +77,6 @@ public class ExternalSPSContext implements Context {
@Override
public boolean isMoverRunning() {
- initializeNamenodeConnector();
try {
FSDataOutputStream out = nnc.getDistributedFileSystem()
.append(HdfsServerConstants.MOVER_ID_PATH);
@@ -101,7 +92,6 @@ public class ExternalSPSContext implements Context {
@Override
public long getFileID(String path) throws UnresolvedLinkException,
AccessControlException, ParentNotDirectoryException {
- initializeNamenodeConnector();
HdfsFileStatus fs = null;
try {
fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus(
@@ -121,7 +111,6 @@ public class ExternalSPSContext implements Context {
@Override
public boolean isFileExist(long inodeId) {
- initializeNamenodeConnector();
String filePath = null;
try {
filePath = getFilePath(inodeId);
@@ -145,14 +134,12 @@ public class ExternalSPSContext implements Context {
@Override
public void removeSPSHint(long inodeId) throws IOException {
- initializeNamenodeConnector();
nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
}
@Override
public int getNumLiveDataNodes() {
- initializeNamenodeConnector();
try {
return nnc.getDistributedFileSystem()
.getDataNodeStats(DatanodeReportType.LIVE).length;
@@ -164,7 +151,6 @@ public class ExternalSPSContext implements Context {
@Override
public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
- initializeNamenodeConnector();
return nnc.getDistributedFileSystem().getClient()
.getLocatedFileInfo(getFilePath(inodeID), false);
}
@@ -172,13 +158,11 @@ public class ExternalSPSContext implements Context {
@Override
public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException {
- initializeNamenodeConnector();
return nnc.getLiveDatanodeStorageReport();
}
@Override
public boolean hasLowRedundancyBlocks(long inodeID) {
- initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
} catch (IOException e) {
@@ -191,7 +175,6 @@ public class ExternalSPSContext implements Context {
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) {
- initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
estimatedSize);
@@ -204,7 +187,6 @@ public class ExternalSPSContext implements Context {
@Override
public Long getNextSPSPathId() {
- initializeNamenodeConnector();
try {
return nnc.getNNProtocolConnection().getNextSPSPathId();
} catch (IOException e) {
@@ -233,39 +215,4 @@ public class ExternalSPSContext implements Context {
return null;
}
}
-
- @Override
- public void close() throws IOException {
- synchronized (nnConnectionLock) {
- if (nnc != null) {
- nnc.close();
- }
- }
- }
-
- private void initializeNamenodeConnector() {
- synchronized (nnConnectionLock) {
- if (nnc == null) {
- try {
- nnc = getNameNodeConnector(service.getConf());
- } catch (IOException e) {
- LOG.warn("Exception while creating Namenode Connector.."
- + "Namenode might not have started.", e);
- }
- }
- }
- }
-
- public static NameNodeConnector getNameNodeConnector(Configuration conf)
- throws IOException {
- final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
- List<NameNodeConnector> nncs = Collections.emptyList();
- NameNodeConnector.checkOtherInstanceRunning(false);
- nncs = NameNodeConnector.newNameNodeConnectors(namenodes,
- ExternalSPSContext.class.getSimpleName(),
- HdfsServerConstants.MOVER_ID_PATH, conf,
- NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
- return nncs.get(0);
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
index 964ee8c..ff277ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
@@ -139,11 +139,15 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
*/
public int remainingCapacity() {
int size = service.processingQueueSize();
- if (size >= maxQueueLimitToScan) {
- return 0;
- } else {
- return (maxQueueLimitToScan - size);
+ int remainingSize = 0;
+ if (size < maxQueueLimitToScan) {
+ remainingSize = maxQueueLimitToScan - size;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+ + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
}
+ return remainingSize;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
new file mode 100644
index 0000000..c64abc3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.sps;
+
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class starts and runs external SPS service.
+ */
+@InterfaceAudience.Private
+public class ExternalStoragePolicySatisfier {
+ public static final Logger LOG = LoggerFactory
+ .getLogger(ExternalStoragePolicySatisfier.class);
+
+ /**
+ * Main method to start SPS service.
+ */
+ public static void main(String args[]) throws Exception {
+ NameNodeConnector nnc = null;
+ try {
+ StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
+ LOG);
+ HdfsConfiguration spsConf = new HdfsConfiguration();
+ //TODO : login with SPS keytab
+ StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
+ nnc = getNameNodeConnector(spsConf);
+
+ boolean spsRunning;
+ spsRunning = nnc.getDistributedFileSystem().getClient()
+ .isStoragePolicySatisfierRunning();
+ if (spsRunning) {
+ throw new RuntimeException(
+ "Startup failed due to StoragePolicySatisfier"
+ + " running inside Namenode.");
+ }
+
+ ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
+ ExternalBlockMovementListener blkMoveListener =
+ new ExternalBlockMovementListener();
+ ExternalSPSBlockMoveTaskHandler externalHandler =
+ new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
+ externalHandler.init();
+ sps.init(context, new ExternalSPSFileIDCollector(context, sps),
+ externalHandler, blkMoveListener);
+ sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
+ if (sps != null) {
+ sps.join();
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed to start storage policy satisfier.", e);
+ terminate(1, e);
+ } finally {
+ if (nnc != null) {
+ nnc.close();
+ }
+ }
+ }
+
+ private static NameNodeConnector getNameNodeConnector(Configuration conf)
+ throws IOException, InterruptedException {
+ final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+ final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
+ while (true) {
+ try {
+ final List<NameNodeConnector> nncs = NameNodeConnector
+ .newNameNodeConnectors(namenodes,
+ StoragePolicySatisfier.class.getSimpleName(),
+ externalSPSPathId, conf,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+ return nncs.get(0);
+ } catch (IOException e) {
+ LOG.warn("Failed to connect with namenode", e);
+ Thread.sleep(3000); // retry the connection after few secs
+ }
+ }
+ }
+
+ /**
+ * It is implementation of BlockMovementListener.
+ */
+ private static class ExternalBlockMovementListener
+ implements BlockMovementListener {
+
+ private List<Block> actualBlockMovements = new ArrayList<>();
+
+ @Override
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+ for (Block block : moveAttemptFinishedBlks) {
+ actualBlockMovements.add(block);
+ }
+ LOG.info("Movement attempted blocks", actualBlockMovements);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index c10bfc3..25a6cd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -238,5 +238,13 @@ Check the running status of Storage Policy Satisfier service in namenode. If it
### Enable(internal service inside NN or external service outside NN) or Disable SPS without restarting Namenode
If administrator wants to switch modes of SPS feature while Namenode is running, first he/she needs to update the desired value(internal or external or none) for the configuration item `dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the following Namenode reconfig command
-+ hdfs dfsadmin -reconfig namenode <host:ipc_port> start
+* Command:
+
+ hdfs dfsadmin -reconfig namenode <host:ipc_port> start
+
+### Start External SPS Service.
+If administrator wants to start external sps, first he/she needs to configure property `dfs.storage.policy.satisfier.mode` with `external` value in configuration file (`hdfs-site.xml`) and then run Namenode reconfig command. After this start external sps service using following command
+
+* Command:
+ hdfs --daemon start sps
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 935d4f2..135d996 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -603,7 +603,7 @@ public class TestStoragePolicySatisfier {
if (out != null) {
out.close();
}
- hdfsCluster.shutdown();
+ shutdownCluster();
}
}
@@ -626,9 +626,7 @@ public class TestStoragePolicySatisfier {
Assert.assertTrue("SPS should be running as "
+ "no Mover really running", running);
} finally {
- if (hdfsCluster != null) {
- hdfsCluster.shutdown();
- }
+ shutdownCluster();
}
}
@@ -672,9 +670,7 @@ public class TestStoragePolicySatisfier {
DFSTestUtil.waitExpectedStorageType(
file1, StorageType.DISK, 2, 30000, dfs);
} finally {
- if (hdfsCluster != null) {
- hdfsCluster.shutdown();
- }
+ shutdownCluster();
}
}
@@ -1381,7 +1377,11 @@ public class TestStoragePolicySatisfier {
// Remove 10 element and make queue free, So other traversing will start.
for (int i = 0; i < 10; i++) {
String path = expectedTraverseOrder.remove(0);
- long trackId = sps.getStorageMovementQueue().get().getFileId();
+ ItemInfo itemInfo = sps.getStorageMovementQueue().get();
+ if (itemInfo == null) {
+ continue;
+ }
+ long trackId = itemInfo.getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1392,7 +1392,11 @@ public class TestStoragePolicySatisfier {
// Check other element traversed in order and E, M, U, R, S should not be
// added in queue which we already removed from expected list
for (String path : expectedTraverseOrder) {
- long trackId = sps.getStorageMovementQueue().get().getFileId();
+ ItemInfo itemInfo = sps.getStorageMovementQueue().get();
+ if (itemInfo == null) {
+ continue;
+ }
+ long trackId = itemInfo.getFileId();
INode inode = fsDir.getInode(trackId);
assertTrue("Failed to traverse tree, expected " + path + " but got "
+ inode.getFullPathName(), path.equals(inode.getFullPathName()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5845c36c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index fe08b8f..febc2ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -22,7 +22,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -43,8 +42,6 @@ import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
import org.junit.Assert;
import org.junit.Ignore;
-import com.google.common.collect.Maps;
-
/**
* Tests the external sps service plugins.
*/
@@ -95,7 +92,8 @@ public class TestExternalStoragePolicySatisfier
SPSService spsService = blkMgr.getSPSService();
spsService.stopGracefully();
- ExternalSPSContext context = new ExternalSPSContext(spsService);
+ ExternalSPSContext context = new ExternalSPSContext(spsService,
+ getNameNodeConnector(conf));
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
@@ -124,7 +122,8 @@ public class TestExternalStoragePolicySatisfier
spsService = blkMgr.getSPSService();
spsService.stopGracefully();
- ExternalSPSContext context = new ExternalSPSContext(spsService);
+ ExternalSPSContext context = new ExternalSPSContext(spsService,
+ getNameNodeConnector(getConf()));
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
@@ -161,16 +160,22 @@ public class TestExternalStoragePolicySatisfier
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
- Map<URI, List<Path>> nnMap = Maps.newHashMap();
- for (URI nn : namenodes) {
- nnMap.put(nn, null);
- }
final Path externalSPSPathId = new Path("/system/tmp.id");
- final List<NameNodeConnector> nncs = NameNodeConnector
- .newNameNodeConnectors(nnMap,
- StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
- conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
- return nncs.get(0);
+ NameNodeConnector.checkOtherInstanceRunning(false);
+ while (true) {
+ try {
+ final List<NameNodeConnector> nncs = NameNodeConnector
+ .newNameNodeConnectors(namenodes,
+ StoragePolicySatisfier.class.getSimpleName(),
+ externalSPSPathId, conf,
+ NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+ return nncs.get(0);
+ } catch (IOException e) {
+ LOG.warn("Failed to connect with namenode", e);
+ // Ignore
+ }
+
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org