You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/01/29 18:32:11 UTC
hadoop git commit: HDFS-13077. [SPS]: Fix review comments of external
storage policy satisfier. Contributed by Rakesh R.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-10285 3b0deb6be -> 3247dc52f
HDFS-13077. [SPS]: Fix review comments of external storage policy satisfier. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3247dc52
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3247dc52
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3247dc52
Branch: refs/heads/HDFS-10285
Commit: 3247dc52f7a547d863817058de37e8b4e1c3a226
Parents: 3b0deb6
Author: Surendra Singh Lilhore <su...@apache.org>
Authored: Mon Jan 29 23:59:55 2018 +0530
Committer: Surendra Singh Lilhore <su...@apache.org>
Committed: Mon Jan 29 23:59:55 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 +-
.../server/blockmanagement/BlockManager.java | 33 +++-
.../namenode/FSDirSatisfyStoragePolicyOp.java | 15 ++
.../hdfs/server/namenode/FSNamesystem.java | 41 ++--
.../hdfs/server/namenode/NameNodeRpcServer.java | 11 ++
.../hdfs/server/namenode/sps/SPSPathIds.java | 8 +-
.../namenode/sps/StoragePolicySatisfier.java | 6 +-
.../hdfs/server/sps/ExternalSPSContext.java | 4 +
.../sps/ExternalStoragePolicySatisfier.java | 30 ++-
.../sps/TestStoragePolicySatisfier.java | 7 +-
.../sps/TestExternalStoragePolicySatisfier.java | 195 ++++++++++++++++++-
11 files changed, 323 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 18024c7..355aa8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -598,7 +598,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
public static final int DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
- // SPS related configurations
+ // StoragePolicySatisfier (SPS) related configurations
public static final String DFS_STORAGE_POLICY_SATISFIER_MODE_KEY =
"dfs.storage.policy.satisfier.mode";
public static final String DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT =
@@ -627,6 +627,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.storage.policy.satisfier.low.max-streams.preference";
public static final boolean DFS_STORAGE_POLICY_SATISFIER_LOW_MAX_STREAMS_PREFERENCE_DEFAULT =
true;
+ public static final String DFS_SPS_MAX_OUTSTANDING_PATHS_KEY =
+ "dfs.storage.policy.satisfier.max.outstanding.paths";
+ public static final int DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT = 10000;
+
+ // SPS keytab configurations, by default it is disabled.
+ public static final String DFS_SPS_ADDRESS_KEY =
+ "dfs.storage.policy.satisfier.address";
+ public static final String DFS_SPS_ADDRESS_DEFAULT= "0.0.0.0:0";
+ public static final String DFS_SPS_KEYTAB_FILE_KEY =
+ "dfs.storage.policy.satisfier.keytab.file";
+ public static final String DFS_SPS_KERBEROS_PRINCIPAL_KEY =
+ "dfs.storage.policy.satisfier.kerberos.principal";
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index dd3ec43..95e3cad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -446,6 +446,7 @@ public class BlockManager implements BlockStatsMXBean {
private final boolean storagePolicyEnabled;
private StoragePolicySatisfierMode spsMode;
private SPSPathIds spsPaths;
+ private final int spsOutstandingPathsLimit;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -484,14 +485,16 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
-
+ // StoragePolicySatisfier(SPS) configs
storagePolicyEnabled =
conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
- String spsModeVal =
- conf.get(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+ String spsModeVal = conf.get(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+ spsOutstandingPathsLimit = conf.getInt(
+ DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
+ DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
spsMode = StoragePolicySatisfierMode.fromString(spsModeVal);
spsPaths = new SPSPathIds();
sps = new StoragePolicySatisfier(conf);
@@ -5181,6 +5184,12 @@ public class BlockManager implements BlockStatsMXBean {
*/
public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
String path) throws IOException {
+ if (spsMode != StoragePolicySatisfierMode.INTERNAL) {
+ LOG.debug("Satisfier is not running inside namenode, so status "
+ + "can't be returned.");
+ throw new IOException("Satisfier is not running inside namenode, "
+ + "so status can't be returned.");
+ }
return sps.checkStoragePolicySatisfyPathStatus(path);
}
@@ -5200,6 +5209,20 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
+ * Verify that satisfier queue limit exceeds allowed outstanding limit.
+ */
+ public void verifyOutstandingSPSPathQLimit() throws IOException {
+ long size = spsPaths.size();
+ // Checking that the SPS call Q exceeds the allowed limit.
+ if (spsOutstandingPathsLimit - size <= 0) {
+ LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
+ spsOutstandingPathsLimit, size);
+ throw new IOException("Outstanding satisfier queue limit: "
+ + spsOutstandingPathsLimit + " exceeded, try later!");
+ }
+ }
+
+ /**
* Removes the SPS path id from the list of sps paths.
*/
public void removeSPSPathId(long trackId) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index eed6e52..5ffd6e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -45,6 +45,21 @@ final class FSDirSatisfyStoragePolicyOp {
private FSDirSatisfyStoragePolicyOp() {
}
+ /**
+ * Satisfy storage policy function which will add the entry to SPS call queue
+ * and will perform satisfaction async way.
+ *
+ * @param fsd
+ * fs directory
+ * @param bm
+ * block manager
+ * @param src
+ * source path
+ * @param logRetryCache
+ * whether to record RPC ids in editlog for retry cache rebuilding
+ * @return file status info
+ * @throws IOException
+ */
static FileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
String src, boolean logRetryCache) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 5871a7f..eea4ab7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2234,28 +2234,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "satisfyStoragePolicy";
FileStatus auditStat;
+ validateStoragePolicySatisfy();
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot satisfy storage policy for " + src);
- // make sure storage policy is enabled, otherwise
- // there is no need to satisfy storage policy.
- if (!dir.isStoragePolicyEnabled()) {
- throw new IOException(String.format(
- "Failed to satisfy storage policy since %s is set to false.",
- DFS_STORAGE_POLICY_ENABLED_KEY));
- }
-
- if (!blockManager.isSPSEnabled()
- || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
- && !blockManager.getStoragePolicySatisfier().isRunning())) {
- throw new UnsupportedActionException(
- "Cannot request to satisfy storage policy "
- + "when storage policy satisfier feature has been disabled"
- + " by admin. Seek for an admin help to enable it "
- + "or use Mover tool.");
- }
auditStat = FSDirSatisfyStoragePolicyOp.satisfyStoragePolicy(
dir, blockManager, src, logRetryCache);
} catch (AccessControlException e) {
@@ -2268,6 +2252,29 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
logAuditEvent(true, operationName, src, null, auditStat);
}
+ private void validateStoragePolicySatisfy()
+ throws UnsupportedActionException, IOException {
+ // make sure storage policy is enabled, otherwise
+ // there is no need to satisfy storage policy.
+ if (!dir.isStoragePolicyEnabled()) {
+ throw new IOException(String.format(
+ "Failed to satisfy storage policy since %s is set to false.",
+ DFS_STORAGE_POLICY_ENABLED_KEY));
+ }
+ // checks sps status
+ if (!blockManager.isSPSEnabled()
+ || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
+ && !blockManager.getStoragePolicySatisfier().isRunning())) {
+ throw new UnsupportedActionException(
+ "Cannot request to satisfy storage policy "
+ + "when storage policy satisfier feature has been disabled"
+ + " by admin. Seek for an admin help to enable it "
+ + "or use Mover tool.");
+ }
+ // checks SPS Q has many outstanding requests.
+ blockManager.verifyOutstandingSPSPathQLimit();
+ }
+
/**
* unset storage policy set for a given file or a directory.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index ad1983f..57b1ded 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -109,6 +109,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -2547,6 +2548,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
+ // Check that internal SPS service is running
+ if (namesystem.getBlockManager()
+ .getSPSMode() == StoragePolicySatisfierMode.INTERNAL
+ && namesystem.getBlockManager().getSPSService().isRunning()) {
+ LOG.debug("SPS service is internally enabled and running inside "
+ + "namenode, so external SPS is not allowed to fetch the path Ids");
+ throw new IOException("SPS service is internally enabled and running"
+ + " inside namenode, so external SPS is not allowed to fetch"
+ + " the path Ids");
+ }
return namesystem.getBlockManager().getNextSPSPathId();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
index e0f4999..6c0f8b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability;
public class SPSPathIds {
// List of pending dir to satisfy the policy
- // TODO: Make this bounded queue.
private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
/**
@@ -61,4 +60,11 @@ public class SPSPathIds {
public synchronized Long pollNext() {
return spsDirsToBeTraveresed.poll();
}
+
+ /**
+ * @return the size of the queue.
+ */
+ public synchronized long size() {
+ return spsDirsToBeTraveresed.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 4ddfe2e..87faced 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -91,7 +91,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
private int blockMovementMaxRetry;
private Context ctxt;
private BlockMoveTaskHandler blockMoveTaskHandler;
- private Configuration conf;
+ private final Configuration conf;
public StoragePolicySatisfier(Configuration conf) {
this.conf = conf;
@@ -441,8 +441,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
liveDns, ecPolicy);
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
- } else
- if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
+ } else if (status !=
+ BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
// Check if the previous block was successfully paired. Here the
// status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
// blocks of a file found its eligible targets to satisfy the storage
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index e3b3bbb..c309209 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -175,6 +175,10 @@ public class ExternalSPSContext implements Context {
@Override
public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
long estimatedSize) {
+ // TODO: Instead of calling namenode for checking the available space, it
+ // can be optimized by maintaining local cache of datanode storage report
+ // and do the computations. This local cache can be refreshed per file or
+ // periodic fashion.
try {
return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
estimatedSize);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index c64abc3..59935b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.sps;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
@@ -28,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -36,6 +38,9 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,20 +49,25 @@ import org.slf4j.LoggerFactory;
* This class starts and runs external SPS service.
*/
@InterfaceAudience.Private
-public class ExternalStoragePolicySatisfier {
+public final class ExternalStoragePolicySatisfier {
public static final Logger LOG = LoggerFactory
.getLogger(ExternalStoragePolicySatisfier.class);
+ private ExternalStoragePolicySatisfier() {
+ // This is just a class to start and run external sps.
+ }
+
/**
* Main method to start SPS service.
*/
- public static void main(String args[]) throws Exception {
+ public static void main(String[] args) throws Exception {
NameNodeConnector nnc = null;
try {
StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
LOG);
HdfsConfiguration spsConf = new HdfsConfiguration();
- //TODO : login with SPS keytab
+ // login with SPS keytab
+ secureLogin(spsConf);
StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
nnc = getNameNodeConnector(spsConf);
@@ -92,6 +102,18 @@ public class ExternalStoragePolicySatisfier {
}
}
+ private static void secureLogin(Configuration conf)
+ throws IOException {
+ UserGroupInformation.setConfiguration(conf);
+ String addr = conf.get(DFSConfigKeys.DFS_SPS_ADDRESS_KEY,
+ DFSConfigKeys.DFS_SPS_ADDRESS_DEFAULT);
+ InetSocketAddress socAddr = NetUtils.createSocketAddr(addr, 0,
+ DFSConfigKeys.DFS_SPS_ADDRESS_KEY);
+ SecurityUtil.login(conf, DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY,
+ DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY,
+ socAddr.getHostName());
+ }
+
private static NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException, InterruptedException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
@@ -100,7 +122,7 @@ public class ExternalStoragePolicySatisfier {
try {
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(namenodes,
- StoragePolicySatisfier.class.getSimpleName(),
+ ExternalStoragePolicySatisfier.class.getSimpleName(),
externalSPSPathId, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 135d996..9e0a39f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -134,8 +134,9 @@ public class TestStoragePolicySatisfier {
*
* @throws IOException
*/
- public void getFS() throws IOException {
+ public DistributedFileSystem getFS() throws IOException {
this.dfs = hdfsCluster.getFileSystem();
+ return this.dfs;
}
@After
@@ -423,9 +424,9 @@ public class TestStoragePolicySatisfier {
+ "for %s since %s is set to false.",
FILE, DFS_STORAGE_POLICY_ENABLED_KEY));
} catch (IOException e) {
- Assert.assertTrue(e.getMessage().contains(String.format(
+ GenericTestUtils.assertExceptionContains(String.format(
"Failed to satisfy storage policy since %s is set to false.",
- DFS_STORAGE_POLICY_ENABLED_KEY)));
+ DFS_STORAGE_POLICY_ENABLED_KEY), e);
}
hdfsCluster.getConfiguration(0).
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3247dc52/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index febc2ea..15a4271 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -17,17 +17,40 @@
*/
package org.apache.hadoop.hdfs.server.sps;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+
+import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
@@ -39,8 +62,17 @@ import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
+import org.junit.Test;
/**
* Tests the external sps service plugins.
@@ -52,6 +84,18 @@ public class TestExternalStoragePolicySatisfier
{StorageType.DISK, StorageType.DISK},
{StorageType.DISK, StorageType.DISK}};
private NameNodeConnector nnc;
+ private File keytabFile;
+ private String principal;
+ private MiniKdc kdc;
+ private File baseDir;
+
+ @After
+ public void destroy() throws Exception {
+ if (kdc != null) {
+ kdc.stop();
+ FileUtil.fullyDelete(baseDir);
+ }
+ }
@Override
public void setUp() {
@@ -178,20 +222,157 @@ public class TestExternalStoragePolicySatisfier
}
}
+ private void initSecureConf(Configuration conf) throws Exception {
+ String username = "externalSPS";
+ baseDir = GenericTestUtils
+ .getTestDir(TestExternalStoragePolicySatisfier.class.getSimpleName());
+ FileUtil.fullyDelete(baseDir);
+ Assert.assertTrue(baseDir.mkdirs());
+
+ Properties kdcConf = MiniKdc.createConf();
+ kdc = new MiniKdc(kdcConf, baseDir);
+ kdc.start();
+
+ SecurityUtil.setAuthenticationMethod(
+ UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+ UserGroupInformation.setConfiguration(conf);
+ KerberosName.resetDefaultRealm();
+ Assert.assertTrue("Expected configuration to enable security",
+ UserGroupInformation.isSecurityEnabled());
+
+ keytabFile = new File(baseDir, username + ".keytab");
+ String keytab = keytabFile.getAbsolutePath();
+ // Windows will not reverse name lookup "127.0.0.1" to "localhost".
+ String krbInstance = Path.WINDOWS ? "127.0.0.1" : "localhost";
+ principal = username + "/" + krbInstance + "@" + kdc.getRealm();
+ String spnegoPrincipal = "HTTP/" + krbInstance + "@" + kdc.getRealm();
+ kdc.createPrincipal(keytabFile, username, username + "/" + krbInstance,
+ "HTTP/" + krbInstance);
+
+ conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, principal);
+ conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
+ conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, principal);
+ conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
+ conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
+ conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+ conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+ conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+
+ conf.set(DFS_SPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFS_SPS_KEYTAB_FILE_KEY, keytab);
+ conf.set(DFS_SPS_KERBEROS_PRINCIPAL_KEY, principal);
+
+ String keystoresDir = baseDir.getAbsolutePath();
+ String sslConfDir = KeyStoreTestUtil
+ .getClasspathDir(TestExternalStoragePolicySatisfier.class);
+ KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
+
+ conf.set(DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
+ KeyStoreTestUtil.getClientSSLConfigFileName());
+ conf.set(DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+ KeyStoreTestUtil.getServerSSLConfigFileName());
+
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ }
+
/**
- * This test need not run as external scan is not a batch based scanning right
- * now.
+ * Test SPS runs fine when logging in with a keytab in kerberized env. Reusing
+ * testWhenStoragePolicySetToALLSSD here for basic functionality testing.
*/
- @Ignore("ExternalFileIdCollector is not batch based right now."
- + " So, ignoring it.")
- public void testBatchProcessingForSPSDirectory() throws Exception {
+ @Test(timeout = 300000)
+ public void testWithKeytabs() throws Exception {
+ try {
+ initSecureConf(getConf());
+ final UserGroupInformation ugi = UserGroupInformation
+ .loginUserFromKeytabAndReturnUGI(principal,
+ keytabFile.getAbsolutePath());
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ // verify that sps runs Ok.
+ testWhenStoragePolicySetToALLSSD();
+ // verify that UGI was logged in using keytab.
+ Assert.assertTrue(UserGroupInformation.isLoginKeytabBased());
+ return null;
+ }
+ });
+ } finally {
+ // Reset UGI so that other tests are not affected.
+ UserGroupInformation.reset();
+ UserGroupInformation.setConfiguration(new Configuration());
+ }
}
/**
- * Status won't be supported for external SPS, now. So, ignoring it.
+ * Test verifies that SPS call will throw exception if the call Q exceeds
+ * OutstandingQueueLimit value.
+ *
+ * @throws Exception
*/
- @Ignore("Status is not supported for external SPS. So, ignoring it.")
+ @Test(timeout = 300000)
+ public void testOutstandingQueueLimitExceeds() throws Exception {
+ try {
+ getConf().setInt(DFS_SPS_MAX_OUTSTANDING_PATHS_KEY, 3);
+ createCluster();
+ List<String> files = new ArrayList<>();
+ files.add(FILE);
+ DistributedFileSystem fs = getFS();
+ BlockManager blkMgr = getCluster().getNameNode().getNamesystem()
+ .getBlockManager();
+ SPSService spsService = blkMgr.getSPSService();
+ spsService.stopGracefully(); // stops SPS
+
+ // Creates 4 more files. Send all of them for satisfying the storage
+ // policy together.
+ for (int i = 0; i < 3; i++) {
+ String file1 = "/testOutstandingQueueLimitExceeds_" + i;
+ files.add(file1);
+ writeContent(file1);
+ fs.satisfyStoragePolicy(new Path(file1));
+ }
+ String fileExceeds = "/testOutstandingQueueLimitExceeds_" + 4;
+ files.add(fileExceeds);
+ writeContent(fileExceeds);
+ try {
+ fs.satisfyStoragePolicy(new Path(fileExceeds));
+ Assert.fail("Should throw exception as it exceeds "
+ + "outstanding SPS call Q limit");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "Outstanding satisfier queue limit: 3 exceeded, try later!", ioe);
+ }
+ } finally {
+ shutdownCluster();
+ }
+ }
+
+ /**
+ * Test verifies status check when Satisfier is not running inside namenode.
+ */
+ @Test(timeout = 90000)
public void testStoragePolicySatisfyPathStatus() throws Exception {
+ createCluster();
+ DistributedFileSystem fs = getFS();
+ try {
+ fs.getClient().checkStoragePolicySatisfyPathStatus(FILE);
+ Assert.fail("Should throw exception as SPS is not running inside NN!");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("Satisfier is not running"
+ + " inside namenode, so status can't be returned.", e);
+ }
+ }
+
+ /**
+ * This test need not run as external scan is not a batch based scanning right
+ * now.
+ */
+ @Ignore("ExternalFileIdCollector is not batch based right now."
+ + " So, ignoring it.")
+ public void testBatchProcessingForSPSDirectory() throws Exception {
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org