You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/02/07 10:29:48 UTC
[2/2] hadoop git commit: HDFS-13097: [SPS]: Fix the branch review
comments(Part1). Contributed by Surendra Singh.
HDFS-13097: [SPS]: Fix the branch review comments(Part1). Contributed by Surendra Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a42e7a5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a42e7a5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a42e7a5
Branch: refs/heads/HDFS-10285
Commit: 4a42e7a5f222375ff3108bf9e0c15193249b8b0f
Parents: 3247dc5
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Wed Feb 7 02:28:23 2018 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Wed Feb 7 02:28:23 2018 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 4 +-
.../hadoop/hdfs/protocol/ClientProtocol.java | 6 +-
.../ClientNamenodeProtocolTranslatorPB.java | 14 +-
.../src/main/proto/ClientNamenodeProtocol.proto | 8 +-
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 61 ---
...tNamenodeProtocolServerSideTranslatorPB.java | 16 +-
.../server/blockmanagement/BlockManager.java | 255 +-----------
.../blockmanagement/DatanodeDescriptor.java | 33 +-
.../hdfs/server/common/HdfsServerConstants.java | 2 +-
.../datanode/StoragePolicySatisfyWorker.java | 15 +-
.../federation/router/RouterRpcServer.java | 2 +-
.../apache/hadoop/hdfs/server/mover/Mover.java | 2 +-
.../namenode/FSDirSatisfyStoragePolicyOp.java | 26 +-
.../server/namenode/FSDirStatAndListingOp.java | 1 -
.../hdfs/server/namenode/FSDirXAttrOp.java | 2 +-
.../hdfs/server/namenode/FSDirectory.java | 2 +-
.../hdfs/server/namenode/FSNamesystem.java | 46 +--
.../hadoop/hdfs/server/namenode/NameNode.java | 30 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 21 +-
.../sps/BlockStorageMovementNeeded.java | 4 +-
.../namenode/sps/IntraSPSNameNodeContext.java | 6 +-
.../hdfs/server/namenode/sps/SPSPathIds.java | 70 ----
.../hdfs/server/namenode/sps/SPSService.java | 10 +-
.../namenode/sps/StoragePolicySatisfier.java | 137 ++++---
.../sps/StoragePolicySatisfyManager.java | 399 +++++++++++++++++++
.../sps/ExternalStoragePolicySatisfier.java | 2 +-
.../hadoop/hdfs/tools/StoragePolicyAdmin.java | 2 +-
.../namenode/TestNameNodeReconfigure.java | 19 +-
.../TestPersistentStoragePolicySatisfier.java | 3 +-
.../TestStoragePolicySatisfierWithHA.java | 6 +-
.../sps/TestStoragePolicySatisfier.java | 35 +-
...stStoragePolicySatisfierWithStripedFile.java | 6 +-
.../sps/TestExternalStoragePolicySatisfier.java | 24 +-
33 files changed, 665 insertions(+), 604 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 489a04e..d169e82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -3096,8 +3096,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
- public boolean isStoragePolicySatisfierRunning() throws IOException {
- return namenode.isStoragePolicySatisfierRunning();
+ public boolean isInternalSatisfierRunning() throws IOException {
+ return namenode.isInternalSatisfierRunning();
}
Tracer getTracer() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 9487195..bc3b8d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1751,12 +1751,12 @@ public interface ClientProtocol {
void satisfyStoragePolicy(String path) throws IOException;
/**
- * Check if StoragePolicySatisfier is running.
- * @return true if StoragePolicySatisfier is running
+ * Check if internal StoragePolicySatisfier is running.
+ * @return true if internal StoragePolicySatisfier is running
* @throws IOException
*/
@Idempotent
- boolean isStoragePolicySatisfierRunning() throws IOException;
+ boolean isInternalSatisfierRunning() throws IOException;
/**
* Check the storage policy satisfy status of the path for which
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index afc48b8..9dbddf8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -150,8 +150,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@@ -295,8 +295,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
private final static GetErasureCodingCodecsRequestProto
VOID_GET_EC_CODEC_REQUEST = GetErasureCodingCodecsRequestProto
.newBuilder().build();
- private final static IsStoragePolicySatisfierRunningRequestProto
- VOID_IS_SPS_RUNNING_REQUEST = IsStoragePolicySatisfierRunningRequestProto
+ private final static IsInternalSatisfierRunningRequestProto
+ VOID_IS_SPS_RUNNING_REQUEST = IsInternalSatisfierRunningRequestProto
.newBuilder().build();
@@ -1895,10 +1895,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public boolean isStoragePolicySatisfierRunning() throws IOException {
+ public boolean isInternalSatisfierRunning() throws IOException {
try {
- IsStoragePolicySatisfierRunningResponseProto rep =
- rpcProxy.isStoragePolicySatisfierRunning(null,
+ IsInternalSatisfierRunningResponseProto rep =
+ rpcProxy.isInternalSatisfierRunning(null,
VOID_IS_SPS_RUNNING_REQUEST);
return rep.getRunning();
} catch (ServiceException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index ee03c0d..4e26995 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -827,10 +827,10 @@ message SatisfyStoragePolicyResponseProto {
}
-message IsStoragePolicySatisfierRunningRequestProto { // no parameters
+message IsInternalSatisfierRunningRequestProto { // no parameters
}
-message IsStoragePolicySatisfierRunningResponseProto {
+message IsInternalSatisfierRunningResponseProto {
required bool running = 1;
}
@@ -1035,8 +1035,8 @@ service ClientNamenodeProtocol {
returns(ListOpenFilesResponseProto);
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
returns(SatisfyStoragePolicyResponseProto);
- rpc isStoragePolicySatisfierRunning(IsStoragePolicySatisfierRunningRequestProto)
- returns(IsStoragePolicySatisfierRunningResponseProto);
+ rpc isInternalSatisfierRunning(IsInternalSatisfierRunningRequestProto)
+ returns(IsInternalSatisfierRunningResponseProto);
rpc checkStoragePolicySatisfyPathStatus(CheckStoragePolicySatisfyPathStatusRequestProto)
returns(CheckStoragePolicySatisfyPathStatusResponseProto);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 570b85d..2ad1546 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -53,7 +53,6 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -74,7 +73,6 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -1424,26 +1422,6 @@ public class DFSUtil {
}
/**
- * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
- * configuration.
- *
- * @param conf Configuration
- * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
- */
- public static int getSPSWorkMultiplier(Configuration conf) {
- int spsWorkMultiplier = conf
- .getInt(
- DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
- DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
- Preconditions.checkArgument(
- (spsWorkMultiplier > 0),
- DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
- " = '" + spsWorkMultiplier + "' is invalid. " +
- "It should be a positive, non-zero integer value.");
- return spsWorkMultiplier;
- }
-
- /**
* Get SPNEGO keytab Key from configuration
*
* @param conf Configuration
@@ -1703,43 +1681,4 @@ public class DFSUtil {
}
return id;
}
-
- /**
- * Remove the overlap between the expected types and the existing types.
- *
- * @param expected
- * - Expected storage types list.
- * @param existing
- * - Existing storage types list.
- * @param ignoreNonMovable
- * ignore non-movable storage types by removing them from both
- * expected and existing storage type list to prevent non-movable
- * storage from being moved.
- * @returns if the existing types or the expected types is empty after
- * removing the overlap.
- */
- public static boolean removeOverlapBetweenStorageTypes(
- List<StorageType> expected,
- List<StorageType> existing, boolean ignoreNonMovable) {
- for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
- final StorageType t = i.next();
- if (expected.remove(t)) {
- i.remove();
- }
- }
- if (ignoreNonMovable) {
- removeNonMovable(existing);
- removeNonMovable(expected);
- }
- return expected.isEmpty() || existing.isEmpty();
- }
-
- private static void removeNonMovable(List<StorageType> types) {
- for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
- final StorageType t = i.next();
- if (!t.isMovable()) {
- i.remove();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index ac7bd12..5560aef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -160,8 +160,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsStoragePolicySatisfierRunningResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsInternalSatisfierRunningResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
@@ -1848,14 +1848,14 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
}
@Override
- public IsStoragePolicySatisfierRunningResponseProto
- isStoragePolicySatisfierRunning(RpcController controller,
- IsStoragePolicySatisfierRunningRequestProto req)
+ public IsInternalSatisfierRunningResponseProto
+ isInternalSatisfierRunning(RpcController controller,
+ IsInternalSatisfierRunningRequestProto req)
throws ServiceException {
try {
- boolean ret = server.isStoragePolicySatisfierRunning();
- IsStoragePolicySatisfierRunningResponseProto.Builder builder =
- IsStoragePolicySatisfierRunningResponseProto.newBuilder();
+ boolean ret = server.isInternalSatisfierRunning();
+ IsInternalSatisfierRunningResponseProto.Builder builder =
+ IsInternalSatisfierRunningResponseProto.newBuilder();
builder.setRunning(ret);
return builder.build();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 95e3cad..d71940b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -69,8 +69,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -94,12 +92,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
-import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -442,11 +435,7 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager;
/** For satisfying block storage policies. */
- private final StoragePolicySatisfier sps;
- private final boolean storagePolicyEnabled;
- private StoragePolicySatisfierMode spsMode;
- private SPSPathIds spsPaths;
- private final int spsOutstandingPathsLimit;
+ private final StoragePolicySatisfyManager spsManager;
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
@@ -485,19 +474,10 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
- // StoragePolicySatisfier(SPS) configs
- storagePolicyEnabled =
- conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
- String spsModeVal = conf.get(
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
- spsOutstandingPathsLimit = conf.getInt(
- DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
- DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
- spsMode = StoragePolicySatisfierMode.fromString(spsModeVal);
- spsPaths = new SPSPathIds();
- sps = new StoragePolicySatisfier(conf);
+
+ // sps manager manages the user invoked sps paths and does the movement.
+ spsManager = new StoragePolicySatisfyManager(conf, namesystem, this);
+
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@@ -728,7 +708,7 @@ public class BlockManager implements BlockStatsMXBean {
}
public void close() {
- stopSPS(false);
+ getSPSManager().stop();
bmSafeMode.close();
try {
redundancyThread.interrupt();
@@ -742,7 +722,7 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.close();
pendingReconstruction.stop();
blocksMap.close();
- stopSPSGracefully();
+ getSPSManager().stopGracefully();
}
/** @return the datanodeManager */
@@ -5039,222 +5019,9 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Gets the storage policy satisfier instance.
- *
- * @return sps
- */
- public StoragePolicySatisfier getStoragePolicySatisfier() {
- return sps;
- }
-
- /**
- * Start storage policy satisfier service.
- */
- public void startSPS() {
- if (!(storagePolicyEnabled && spsMode != StoragePolicySatisfierMode.NONE)) {
- LOG.info(
- "Failed to start StoragePolicySatisfier "
- + " as {} set to {} and {} set to {}.",
- DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled,
- DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, spsMode);
- return;
- } else if (sps.isRunning()) {
- LOG.info("Storage policy satisfier is already running"
- + " as internal service.");
- return;
- }
- // starting internal SPS service
- if (spsMode == StoragePolicySatisfierMode.INTERNAL) {
- sps.start(false, spsMode);
- }
- }
-
- /**
- * Stop storage policy satisfier service.
- *
- * @param forceStop
- * true represents that it should stop SPS service by clearing all
- * pending SPS work
- */
- public void stopSPS(boolean forceStop) {
- if (!(storagePolicyEnabled
- && (spsMode != StoragePolicySatisfierMode.NONE))) {
- LOG.info("Storage policy satisfier is not enabled.");
- return;
- } else if (!sps.isRunning()) {
- removeAllSPSPathIds();
- LOG.info("Storage policy satisfier is not running.");
- return;
- }
-
- sps.disable(forceStop);
- }
-
- /**
- * Enable storage policy satisfier by starting its service.
- */
- public void enableInternalSPS() {
- if (!storagePolicyEnabled){
- LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.",
- DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
- return;
- }
- if (sps.isRunning()) {
- LOG.info("Storage policy satisfier is already running as SPS mode:{}.",
- spsMode);
- return;
- }
- updateSPSMode(StoragePolicySatisfierMode.INTERNAL);
- sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps),
- new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
- sps),
- new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null);
- sps.start(true, spsMode);
- }
-
-
-
- /**
- * Enable storage policy satisfier by starting its service.
- */
- public void enableExternalSPS() {
- if (!storagePolicyEnabled){
- LOG.info("Failed to start StoragePolicySatisfier as {} set to {}.",
- DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
- return;
- }
- if (spsMode == StoragePolicySatisfierMode.EXTERNAL) {
- LOG.info("Storage policy satisfier is already enabled as SPS mode:{}.",
- spsMode);
- return;
- }
- updateSPSMode(StoragePolicySatisfierMode.EXTERNAL);
- sps.stopGracefully();
- }
-
- private void updateSPSMode(StoragePolicySatisfierMode newSpsMode) {
- LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
- spsMode, newSpsMode);
- spsMode = newSpsMode;
- }
-
- /**
- * Disable the storage policy satisfier by stopping its services.
- */
- public void disableSPS() {
- switch (spsMode) {
- case NONE:
- break;
- case INTERNAL:
- case EXTERNAL:
- if (!sps.isRunning()) {
- LOG.info("Storage policy satisfier is already stopped.");
- } else {
- LOG.info("Stopping StoragePolicySatisfier mode {}, as admin "
- + "requested to stop it.", spsMode);
- sps.disable(true);
- }
- removeAllSPSPathIds();
- break;
- default:
- // nothing
- break;
- }
- updateSPSMode(StoragePolicySatisfierMode.NONE);
- }
-
- /**
- * Timed wait to stop storage policy satisfier daemon threads.
- */
- public void stopSPSGracefully() {
- removeAllSPSPathIds();
- sps.stopGracefully();
- }
- /**
- * @return True if storage policy satisfier running.
- */
- public boolean isStoragePolicySatisfierRunning() {
- return sps.isRunning();
- }
-
- /**
- * @return status
- * Storage policy satisfy status of the path.
- * @throws IOException
- */
- public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
- String path) throws IOException {
- if (spsMode != StoragePolicySatisfierMode.INTERNAL) {
- LOG.debug("Satisfier is not running inside namenode, so status "
- + "can't be returned.");
- throw new IOException("Satisfier is not running inside namenode, "
- + "so status can't be returned.");
- }
- return sps.checkStoragePolicySatisfyPathStatus(path);
- }
-
- /**
- * @return SPS service instance.
- */
- public SPSService getSPSService() {
- return this.sps;
- }
-
- /**
- * @return the next SPS path id, on which path users has invoked to satisfy
- * storages.
- */
- public Long getNextSPSPathId() {
- return spsPaths.pollNext();
- }
-
- /**
- * Verify that satisfier queue limit exceeds allowed outstanding limit.
- */
- public void verifyOutstandingSPSPathQLimit() throws IOException {
- long size = spsPaths.size();
- // Checking that the SPS call Q exceeds the allowed limit.
- if (spsOutstandingPathsLimit - size <= 0) {
- LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
- spsOutstandingPathsLimit, size);
- throw new IOException("Outstanding satisfier queue limit: "
- + spsOutstandingPathsLimit + " exceeded, try later!");
- }
- }
-
- /**
- * Removes the SPS path id from the list of sps paths.
- */
- public void removeSPSPathId(long trackId) {
- spsPaths.remove(trackId);
- }
-
- /**
- * Clean up all sps path ids.
- */
- public void removeAllSPSPathIds() {
- spsPaths.clear();
- }
-
- /**
- * Adds the sps path to SPSPathIds list.
- */
- public void addSPSPathId(long id) {
- spsPaths.add(id);
- }
-
- /**
- * @return true if sps is running as an internal service or external service.
- */
- public boolean isSPSEnabled() {
- return spsMode == StoragePolicySatisfierMode.INTERNAL
- || spsMode == StoragePolicySatisfierMode.EXTERNAL;
- }
-
- /**
- * @return sps service mode.
+ * @return sps manager.
*/
- public StoragePolicySatisfierMode getSPSMode() {
- return spsMode;
+ public StoragePolicySatisfyManager getSPSManager() {
+ return spsManager;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index b09d908..24b948c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -211,8 +211,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
* A queue of blocks corresponding to trackID for moving its storage
* placements by this datanode.
*/
- private final Queue<BlockMovingInfo> storageMovementBlocks =
- new LinkedList<>();
+ private final BlockQueue<BlockMovingInfo> storageMovementBlocks =
+ new BlockQueue<>();
private volatile boolean dropSPSWork = false;
/* Variables for maintaining number of blocks scheduled to be written to
@@ -369,6 +369,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.pendingCached.clear();
this.cached.clear();
this.pendingUncached.clear();
+ this.storageMovementBlocks.clear();
}
public int numBlocks() {
@@ -1082,9 +1083,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
* - storage mismatched block info
*/
public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
- synchronized (storageMovementBlocks) {
- storageMovementBlocks.offer(blkMovingInfo);
- }
+ storageMovementBlocks.offer(blkMovingInfo);
+ BlockManager.LOG
+ .debug("Adding block move task " + blkMovingInfo + " to " + getName()
+ + ", current queue size is " + storageMovementBlocks.size());
}
/**
@@ -1101,23 +1103,18 @@ public class DatanodeDescriptor extends DatanodeInfo {
* total number of blocks which will be send to this datanode for
* block movement.
*
- * @return block infos which needs to move its storage locations.
+ * @return block infos which needs to move its storage locations or null if
+ * there is no block infos to move.
*/
public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
- synchronized (storageMovementBlocks) {
- List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- for (; !storageMovementBlocks.isEmpty()
- && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) {
- blockMovingInfos.add(storageMovementBlocks.poll());
- }
- BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
- .size()];
- blkMoveArray = blockMovingInfos.toArray(blkMoveArray);
- if (blkMoveArray.length > 0) {
- return blkMoveArray;
- }
+ List<BlockMovingInfo> blockMovingInfos = storageMovementBlocks
+ .poll(numBlocksToMoveTasks);
+ if (blockMovingInfos == null || blockMovingInfos.size() <= 0) {
return null;
}
+ BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
+ .size()];
+ return blockMovingInfos.toArray(blkMoveArray);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 1378de2..c6e2263 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -365,7 +365,7 @@ public interface HdfsServerConstants {
String XATTR_ERASURECODING_POLICY =
"system.hdfs.erasurecoding.policy";
- String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps.xattr";
+ String XATTR_SATISFY_STORAGE_POLICY = "user.hdfs.sps";
Path MOVER_ID_PATH = new Path("/system/mover.id");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 42f2e93..af6137c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -75,9 +75,8 @@ public class StoragePolicySatisfyWorker {
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
-
- moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
- DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
+ // Defaulting to 10. This is to minimise the number of move ops.
+ moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
handler = new BlocksMovementsStatusHandler();
@@ -127,21 +126,13 @@ public class StoragePolicySatisfyWorker {
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
+
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
return t;
}
- }, new ThreadPoolExecutor.CallerRunsPolicy() {
- @Override
- public void rejectedExecution(Runnable runnable,
- ThreadPoolExecutor e) {
- LOG.info("Execution for block movement to satisfy storage policy"
- + " got rejected, Executing in current thread");
- // will run in the current thread.
- super.rejectedExecution(runnable, e);
- }
});
moverThreadPool.allowCoreThreadTimeOut(true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 52d3e23..931b58f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -2156,7 +2156,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
}
@Override
- public boolean isStoragePolicySatisfierRunning() throws IOException {
+ public boolean isInternalSatisfierRunning() throws IOException {
checkOperation(OperationCategory.READ, false);
return false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 2cc0e27..af5ab2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -661,7 +661,7 @@ public class Mover {
boolean spsRunning;
try {
spsRunning = nnc.getDistributedFileSystem().getClient()
- .isStoragePolicySatisfierRunning();
+ .isInternalSatisfierRunning();
} catch (RemoteException e) {
IOException cause = e.unwrapRemoteException();
if (cause instanceof StandbyException) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index 5ffd6e8..45d6218 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -75,24 +76,33 @@ final class FSDirSatisfyStoragePolicyOp {
fsd.checkPathAccess(pc, iip, FsAction.WRITE);
}
INode inode = FSDirectory.resolveLastINode(iip);
- if (inodeHasSatisfyXAttr(inode)) {
- throw new IOException(
- "Cannot request to call satisfy storage policy on path "
+ if (inode.isFile() && inode.asFile().numBlocks() == 0) {
+ if (NameNode.LOG.isInfoEnabled()) {
+ NameNode.LOG.info(
+ "Skipping satisfy storage policy on path:{} as "
+ + "this file doesn't have any blocks!",
+ inode.getFullPathName());
+ }
+ } else if (inodeHasSatisfyXAttr(inode)) {
+ NameNode.LOG
+ .warn("Cannot request to call satisfy storage policy on path: "
+ inode.getFullPathName()
+ ", as this file/dir was already called for satisfying "
+ "storage policy.");
- }
- if (unprotectedSatisfyStoragePolicy(inode, fsd)) {
+ } else {
XAttr satisfyXAttr = XAttrHelper
.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
- List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
- xAttrs.add(satisfyXAttr);
+ List<XAttr> xAttrs = Arrays.asList(satisfyXAttr);
List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs,
xAttrs, EnumSet.of(XAttrSetFlag.CREATE));
XAttrStorage.updateINodeXAttrs(inode, newXAttrs,
iip.getLatestSnapshotId());
fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
+
+ // Adding directory in the pending queue, so FileInodeIdCollector
+ // process directory child in batch and recursively
+ fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
}
} finally {
fsd.writeUnlock();
@@ -106,7 +116,7 @@ final class FSDirSatisfyStoragePolicyOp {
} else {
// Adding directory in the pending queue, so FileInodeIdCollector process
// directory child in batch and recursively
- fsd.getBlockManager().addSPSPathId(inode.getId());
+ fsd.getBlockManager().getSPSManager().addPathId(inode.getId());
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index 517aae1..8b77034 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -89,7 +89,6 @@ class FSDirStatAndListingOp {
* @param srcArg The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
- * @param needLocation if blockLocations need to be returned
*
* @param needLocation Include {@link LocatedBlocks} in result.
* @param needBlockToken Include block tokens in {@link LocatedBlocks}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index 62d36c5..0d627f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -203,7 +203,7 @@ class FSDirXAttrOp {
for (XAttr xattr : toRemove) {
if (XATTR_SATISFY_STORAGE_POLICY
.equals(XAttrHelper.getPrefixedName(xattr))) {
- fsd.getBlockManager().getStoragePolicySatisfier()
+ fsd.getBlockManager().getSPSManager().getInternalSPSService()
.clearQueue(inode.getId());
break;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index f56502a..e6b8d66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1406,7 +1406,7 @@ public class FSDirectory implements Closeable {
if (!inode.isSymlink()) {
final XAttrFeature xaf = inode.getXAttrFeature();
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
- if (namesystem.getBlockManager().isSPSEnabled()) {
+ if (namesystem.getBlockManager().getSPSManager().isEnabled()) {
addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index eea4ab7..f9d9637 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -259,10 +259,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
@@ -1291,13 +1288,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
}
- blockManager.getSPSService().init(
- new IntraSPSNameNodeContext(this, blockManager,
- blockManager.getSPSService()),
- new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
- blockManager.getSPSService()),
- new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null);
- blockManager.startSPS();
+ blockManager.getSPSManager().start();
} finally {
startingActiveService = false;
blockManager.checkSafeMode();
@@ -1328,7 +1319,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
writeLock();
try {
if (blockManager != null) {
- blockManager.stopSPS(false);
+ blockManager.getSPSManager().stop();
}
stopSecretManager();
leaseManager.stopMonitor();
@@ -1368,7 +1359,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// Don't want to keep replication queues when not in Active.
blockManager.clearQueues();
blockManager.setInitializedReplQueues(false);
- blockManager.stopSPSGracefully();
+ blockManager.getSPSManager().stopGracefully();
}
} finally {
writeUnlock("stopActiveServices");
@@ -2262,17 +2253,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DFS_STORAGE_POLICY_ENABLED_KEY));
}
// checks sps status
- if (!blockManager.isSPSEnabled()
- || (blockManager.getSPSMode() == StoragePolicySatisfierMode.INTERNAL
- && !blockManager.getStoragePolicySatisfier().isRunning())) {
+ if (!blockManager.getSPSManager().isEnabled() || (blockManager
+ .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL
+ && !blockManager.getSPSManager().isInternalSatisfierRunning())) {
throw new UnsupportedActionException(
"Cannot request to satisfy storage policy "
+ "when storage policy satisfier feature has been disabled"
+ " by admin. Seek for an admin help to enable it "
+ "or use Mover tool.");
}
- // checks SPS Q has many outstanding requests.
- blockManager.verifyOutstandingSPSPathQLimit();
+ // checks SPS Q has many outstanding requests. It will throw IOException if
+ // the limit exceeds.
+ blockManager.getSPSManager().verifyOutstandingPathQLimit();
}
/**
@@ -3949,17 +3941,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
// Handle blocks movement results sent by the coordinator datanode.
- StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
- if (sps != null) {
- if (!sps.isRunning()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Storage policy satisfier is not running. So, ignoring storage"
- + " movement attempt finished block info sent by DN");
- }
- } else {
- sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
+ SPSService sps = blockManager.getSPSManager().getInternalSPSService();
+ if (!sps.isRunning()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Storage policy satisfier is not running. So, ignoring storage"
+ + " movement attempt finished block info sent by DN");
}
+ } else {
+ sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
}
//create ha status
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 58b8c1e..01b7f1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -2075,7 +2075,7 @@ public class NameNode extends ReconfigurableBase implements
} else if (property.equals(ipcClientRPCBackoffEnable)) {
return reconfigureIPCBackoffEnabled(newVal);
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
- return reconfigureSPSEnabled(newVal, property);
+ return reconfigureSPSModeEvent(newVal, property);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
@@ -2159,39 +2159,27 @@ public class NameNode extends ReconfigurableBase implements
return Boolean.toString(clientBackoffEnabled);
}
- String reconfigureSPSEnabled(String newVal, String property)
+ String reconfigureSPSModeEvent(String newVal, String property)
throws ReconfigurationException {
if (newVal == null
|| StoragePolicySatisfierMode.fromString(newVal) == null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property),
new HadoopIllegalArgumentException(
- "For enabling or disabling storage policy satisfier, we must "
- + "pass either none/internal/external string value only"));
+ "For enabling or disabling storage policy satisfier, must "
+ + "pass either internal/external/none string value only"));
}
if (!isActiveState()) {
throw new ReconfigurationException(property, newVal,
- getConf().get(property), new HadoopIllegalArgumentException(
- "Enabling or disabling storage policy satisfier service on "
- + state + " NameNode is not allowed"));
+ getConf().get(property),
+ new HadoopIllegalArgumentException(
+ "Enabling or disabling storage policy satisfier service on "
+ + state + " NameNode is not allowed"));
}
StoragePolicySatisfierMode mode = StoragePolicySatisfierMode
.fromString(newVal);
- switch(mode){
- case NONE:
- namesystem.getBlockManager().disableSPS();
- break;
- case INTERNAL:
- namesystem.getBlockManager().enableInternalSPS();
- break;
- case EXTERNAL:
- namesystem.getBlockManager().enableExternalSPS();
- break;
- default:
- // nothing
- break;
- }
+ namesystem.getBlockManager().getSPSManager().changeModeEvent(mode);
return newVal;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57b1ded..c582eab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2505,15 +2505,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
- public boolean isStoragePolicySatisfierRunning() throws IOException {
+ public boolean isInternalSatisfierRunning() throws IOException {
checkNNStartup();
- String operationName = "isStoragePolicySatisfierRunning";
+ String operationName = "isInternalSatisfierRunning";
namesystem.checkSuperuserPrivilege(operationName);
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
- boolean isSPSRunning =
- namesystem.getBlockManager().isStoragePolicySatisfierRunning();
+ boolean isSPSRunning = namesystem.getBlockManager().getSPSManager()
+ .isInternalSatisfierRunning();
namesystem.logAuditEvent(true, operationName, null);
return isSPSRunning;
}
@@ -2525,8 +2525,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
- return namesystem.getBlockManager().checkStoragePolicySatisfyPathStatus(
- path);
+ return namesystem.getBlockManager().getSPSManager()
+ .checkStoragePolicySatisfyPathStatus(path);
}
@Override
@@ -2548,17 +2548,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.isStandbyState()) {
throw new StandbyException("Not supported by Standby Namenode.");
}
- // Check that internal SPS service is running
- if (namesystem.getBlockManager()
- .getSPSMode() == StoragePolicySatisfierMode.INTERNAL
- && namesystem.getBlockManager().getSPSService().isRunning()) {
+ // Check that SPS daemon service is running inside namenode
+ if (namesystem.getBlockManager().getSPSManager()
+ .getMode() == StoragePolicySatisfierMode.INTERNAL) {
LOG.debug("SPS service is internally enabled and running inside "
+ "namenode, so external SPS is not allowed to fetch the path Ids");
throw new IOException("SPS service is internally enabled and running"
+ " inside namenode, so external SPS is not allowed to fetch"
+ " the path Ids");
}
- return namesystem.getBlockManager().getNextSPSPathId();
+ return namesystem.getBlockManager().getSPSManager().getNextPathId();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 8a10183..c683a63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -311,7 +311,7 @@ public class BlockStorageMovementNeeded {
if (Time.monotonicNow()
- lastStatusCleanTime > statusClearanceElapsedTimeMs) {
lastStatusCleanTime = Time.monotonicNow();
- cleanSpsStatus();
+ cleanSPSStatus();
}
startINodeId = null; // Current inode id successfully scanned.
}
@@ -333,7 +333,7 @@ public class BlockStorageMovementNeeded {
}
}
- private synchronized void cleanSpsStatus() {
+ private synchronized void cleanSPSStatus() {
for (Iterator<Entry<Long, StoragePolicySatisfyPathStatusInfo>> it =
spsStatus.entrySet().iterator(); it.hasNext();) {
Entry<Long, StoragePolicySatisfyPathStatusInfo> entry = it.next();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index ff6cc21..495d1c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -178,17 +178,17 @@ public class IntraSPSNameNodeContext implements Context {
@Override
public Long getNextSPSPathId() {
- return blockManager.getNextSPSPathId();
+ return blockManager.getSPSManager().getNextPathId();
}
@Override
public void removeSPSPathId(long trackId) {
- blockManager.removeSPSPathId(trackId);
+ blockManager.getSPSManager().removePathId(trackId);
}
@Override
public void removeAllSPSPathIds() {
- blockManager.removeAllSPSPathIds();
+ blockManager.getSPSManager().removeAllPathIds();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
deleted file mode 100644
index 6c0f8b2..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.sps;
-
-import java.util.LinkedList;
-import java.util.Queue;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * A class which holds the SPS invoked path ids.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public class SPSPathIds {
-
- // List of pending dir to satisfy the policy
- private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
-
- /**
- * Add the path id to queue.
- */
- public synchronized void add(long pathId) {
- spsDirsToBeTraveresed.add(pathId);
- }
-
- /**
- * Removes the path id.
- */
- public synchronized void remove(long pathId) {
- spsDirsToBeTraveresed.remove(pathId);
- }
-
- /**
- * Clears all path ids.
- */
- public synchronized void clear() {
- spsDirsToBeTraveresed.clear();
- }
-
- /**
- * @return next path id available in queue.
- */
- public synchronized Long pollNext() {
- return spsDirsToBeTraveresed.poll();
- }
-
- /**
- * @return the size of the queue.
- */
- public synchronized long size() {
- return spsDirsToBeTraveresed.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
index ceec3f3..da6e365 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -67,11 +67,12 @@ public interface SPSService {
void stopGracefully();
/**
- * Disable the SPS service.
+ * Stops the SPS service.
*
* @param forceStop
+ * true represents to clear all the sps path's hint, false otherwise.
*/
- void disable(boolean forceStop);
+ void stop(boolean forceStop);
/**
* Check whether StoragePolicySatisfier is running.
@@ -106,6 +107,11 @@ public interface SPSService {
int processingQueueSize();
/**
+ * Clear inodeId present in the processing queue.
+ */
+ void clearQueue(long inodeId);
+
+ /**
* @return the configuration.
*/
Configuration getConf();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 87faced..6b449aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -60,6 +59,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
* Setting storagePolicy on a file after the file write will only update the new
@@ -145,7 +145,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
new BlockStorageMovementAttemptedItems(this,
storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler;
- this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
+ this.spsWorkMultiplier = getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
@@ -163,8 +163,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
serviceMode);
return;
}
- isRunning = true;
- this.spsMode = serviceMode;
if (spsMode == StoragePolicySatisfierMode.INTERNAL
&& ctxt.isMoverRunning()) {
isRunning = false;
@@ -182,6 +180,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
StringUtils.toLowerCase(spsMode.toString()));
}
+ isRunning = true;
+ this.spsMode = serviceMode;
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
@@ -193,7 +193,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@Override
- public synchronized void disable(boolean forceStop) {
+ public synchronized void stop(boolean forceStop) {
isRunning = false;
if (storagePolicySatisfierThread == null) {
return;
@@ -214,19 +214,22 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
@Override
public synchronized void stopGracefully() {
if (isRunning) {
- disable(true);
+ stop(false);
}
if (this.storageMovementsMonitor != null) {
this.storageMovementsMonitor.stopGracefully();
}
- if (storagePolicySatisfierThread == null) {
- return;
- }
- try {
- storagePolicySatisfierThread.join(3000);
- } catch (InterruptedException ie) {
+ if (storagePolicySatisfierThread != null) {
+ try {
+ storagePolicySatisfierThread.join(3000);
+ } catch (InterruptedException ie) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted Exception while waiting to join sps thread,"
+ + " ignoring it", ie);
+ }
+ }
}
}
@@ -351,32 +354,26 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
Thread.sleep(3000);
blockCount = 0L;
}
+ } catch (IOException e) {
+ LOG.error("Exception during StoragePolicySatisfier execution - "
+ + "will continue next cycle", e);
} catch (Throwable t) {
- handleException(t);
- }
- }
- }
-
- private void handleException(Throwable t) {
- // double check to avoid entering into synchronized block.
- if (isRunning) {
- synchronized (this) {
- if (isRunning) {
- if (t instanceof InterruptedException) {
+ synchronized (this) {
+ if (isRunning) {
isRunning = false;
- LOG.info("Stopping StoragePolicySatisfier.");
+ if (t instanceof InterruptedException) {
+ LOG.info("Stopping StoragePolicySatisfier.", t);
+ } else {
+ LOG.error("StoragePolicySatisfier thread received "
+ + "runtime exception.", t);
+ }
// Stopping monitor thread and clearing queues as well
this.clearQueues();
this.storageMovementsMonitor.stopGracefully();
- } else {
- LOG.error(
- "StoragePolicySatisfier thread received runtime exception, "
- + "ignoring", t);
}
}
}
}
- return;
}
private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
@@ -434,7 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
List<StorageType> existing = new LinkedList<StorageType>(
Arrays.asList(blockInfo.getStorageTypes()));
- if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+ if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
blockInfo, expectedStorageTypes, existing, blockInfo.getLocations(),
@@ -499,7 +496,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
DatanodeInfo[] storages, DatanodeStorageReport[] liveDns,
ErasureCodingPolicy ecPolicy) {
boolean foundMatchingTargetNodesForBlock = true;
- if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+ if (!removeOverlapBetweenStorageTypes(expectedStorageTypes,
existing, true)) {
List<StorageTypeNodePair> sourceWithStorageMap =
new ArrayList<StorageTypeNodePair>();
@@ -881,21 +878,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
/**
- * Set file inode in queue for which storage movement needed for its blocks.
- *
- * @param inodeId
- * - file inode/blockcollection id.
- */
- public void satisfyStoragePolicy(Long inodeId) {
- //For file startId and trackId is same
- storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Added track info for inode {} to block "
- + "storageMovementNeeded queue", inodeId);
- }
- }
-
- /**
* Clear queues for given track id.
*/
public void clearQueue(long trackId) {
@@ -958,6 +940,10 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
@Override
public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) {
storageMovementNeeded.add(trackInfo, scanCompleted);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added track info for inode {} to block "
+ + "storageMovementNeeded queue", trackInfo.getFileId());
+ }
}
@Override
@@ -993,4 +979,63 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
//TODO Add join here on SPS rpc server also
storagePolicySatisfierThread.join();
}
+
+ /**
+ * Remove the overlap between the expected types and the existing types.
+ *
+ * @param expected
+ * - Expected storage types list.
+ * @param existing
+ * - Existing storage types list.
+ * @param ignoreNonMovable
+ * ignore non-movable storage types by removing them from both
+ * expected and existing storage type list to prevent non-movable
+ * storage from being moved.
+ * @returns if the existing types or the expected types is empty after
+ * removing the overlap.
+ */
+ private static boolean removeOverlapBetweenStorageTypes(
+ List<StorageType> expected,
+ List<StorageType> existing, boolean ignoreNonMovable) {
+ for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
+ final StorageType t = i.next();
+ if (expected.remove(t)) {
+ i.remove();
+ }
+ }
+ if (ignoreNonMovable) {
+ removeNonMovable(existing);
+ removeNonMovable(expected);
+ }
+ return expected.isEmpty() || existing.isEmpty();
+ }
+
+ private static void removeNonMovable(List<StorageType> types) {
+ for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
+ final StorageType t = i.next();
+ if (!t.isMovable()) {
+ i.remove();
+ }
+ }
+ }
+
+ /**
+ * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
+ * configuration.
+ *
+ * @param conf Configuration
+ * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
+ */
+ private static int getSPSWorkMultiplier(Configuration conf) {
+ int spsWorkMultiplier = conf
+ .getInt(
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+ Preconditions.checkArgument(
+ (spsWorkMultiplier > 0),
+ DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
+ " = '" + spsWorkMultiplier + "' is invalid. " +
+ "It should be a positive, non-zero integer value.");
+ return spsWorkMultiplier;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
new file mode 100644
index 0000000..5bdf6ae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfyManager.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This manages satisfy storage policy invoked path ids and expose methods to
+ * process these path ids. It maintains sps mode(INTERNAL/EXTERNAL/NONE)
+ * configured by the administrator.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
+ * it will start internal sps daemon service inside namenode and process sps
+ * invoked path ids to satisfy the storage policy.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
+ * it won't do anything, just maintains the sps invoked path ids. Administrator
+ * requires to start external sps service explicitly, to fetch the sps invoked
+ * path ids from namenode, then do necessary computations and block movement in
+ * order to satisfy the storage policy. Please refer
+ * {@link ExternalStoragePolicySatisfier} class to understand more about the
+ * external sps service functionality.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then it
+ * will disable the sps feature completely by clearing all queued up sps path's
+ * hint.
+ *
+ * This class is instantiated by the BlockManager.
+ */
+public class StoragePolicySatisfyManager {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(StoragePolicySatisfyManager.class);
+ private final StoragePolicySatisfier spsService;
+ private final boolean storagePolicyEnabled;
+ private volatile StoragePolicySatisfierMode mode;
+ private final Queue<Long> pathsToBeTraveresed;
+ private final int outstandingPathsLimit;
+ private final Namesystem namesystem;
+ private final BlockManager blkMgr;
+
+ public StoragePolicySatisfyManager(Configuration conf, Namesystem namesystem,
+ BlockManager blkMgr) {
+ // StoragePolicySatisfier(SPS) configs
+ storagePolicyEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT);
+ String modeVal = conf.get(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT);
+ outstandingPathsLimit = conf.getInt(
+ DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY,
+ DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_DEFAULT);
+ mode = StoragePolicySatisfierMode.fromString(modeVal);
+ pathsToBeTraveresed = new LinkedList<Long>();
+ // instantiate SPS service by just keeps config reference and not starting
+ // any supporting threads.
+ spsService = new StoragePolicySatisfier(conf);
+ this.namesystem = namesystem;
+ this.blkMgr = blkMgr;
+ }
+
+ /**
+ * This function will do following logic based on the configured sps mode:
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
+ * starts internal daemon service inside namenode.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
+ * it won't do anything. Administrator requires to start external sps service
+ * explicitly.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
+ * service is disabled and won't do any action.
+ */
+ public void start() {
+ if (!storagePolicyEnabled) {
+ LOG.info("Disabling StoragePolicySatisfier service as {} set to {}.",
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
+ return;
+ }
+
+ switch (mode) {
+ case INTERNAL:
+ if (spsService.isRunning()) {
+ LOG.info("Storage policy satisfier is already running"
+ + " as internal daemon service inside namenode.");
+ return;
+ }
+ // starts internal daemon service inside namenode
+ spsService.init(
+ new IntraSPSNameNodeContext(namesystem, blkMgr, spsService),
+ new IntraSPSNameNodeFileIdCollector(namesystem.getFSDirectory(),
+ spsService),
+ new IntraSPSNameNodeBlockMoveTaskHandler(blkMgr, namesystem), null);
+ spsService.start(false, mode);
+ break;
+ case EXTERNAL:
+ LOG.info("Storage policy satisfier is configured as external, "
+ + "please start external sps service explicitly to satisfy policy");
+ break;
+ case NONE:
+ LOG.info("Storage policy satisfier is disabled");
+ break;
+ default:
+ LOG.info("Given mode: {} is invalid", mode);
+ break;
+ }
+ }
+
+ /**
+ * This function will do following logic based on the configured sps mode:
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
+ * stops internal daemon service inside namenode.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
+ * it won't do anything. Administrator requires to stop external sps service
+ * explicitly, if needed.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
+ * service is disabled and won't do any action.
+ */
+ public void stop() {
+ if (!storagePolicyEnabled) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storage policy is not enabled, ignoring");
+ }
+ return;
+ }
+
+ switch (mode) {
+ case INTERNAL:
+ removeAllPathIds();
+ if (!spsService.isRunning()) {
+ LOG.info("Internal storage policy satisfier daemon service"
+ + " is not running");
+ return;
+ }
+ // stops internal daemon service running inside namenode
+ spsService.stop(false);
+ break;
+ case EXTERNAL:
+ removeAllPathIds();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Storage policy satisfier service is running outside namenode"
+ + ", ignoring");
+ }
+ break;
+ case NONE:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storage policy satisfier is not enabled, ignoring");
+ }
+ break;
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invalid mode:{}, ignoring", mode);
+ }
+ break;
+ }
+ }
+
+ /**
+ * Sets new sps mode. If the new mode is internal, then it will start internal
+ * sps service inside namenode. If the new mode is external, then stops
+ * internal sps service running(if any) inside namenode. If the new mode is
+ * none, then it will disable the sps feature completely by clearing all
+ * queued up sps path's hint.
+ */
+ public void changeModeEvent(StoragePolicySatisfierMode newMode) {
+ if (!storagePolicyEnabled) {
+ LOG.info("Failed to change storage policy satisfier as {} set to {}.",
+ DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, storagePolicyEnabled);
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating SPS service status, current mode:{}, new mode:{}",
+ mode, newMode);
+ }
+
+ switch (newMode) {
+ case INTERNAL:
+ if (spsService.isRunning()) {
+ LOG.info("Storage policy satisfier is already running as {} mode.",
+ mode);
+ return;
+ }
+ spsService.init(
+ new IntraSPSNameNodeContext(this.namesystem, this.blkMgr, spsService),
+ new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
+ spsService),
+ new IntraSPSNameNodeBlockMoveTaskHandler(this.blkMgr,
+ this.namesystem),
+ null);
+ spsService.start(true, newMode);
+ break;
+ case EXTERNAL:
+ if (mode == newMode) {
+ LOG.info("Storage policy satisfier is already in mode:{},"
+ + " so ignoring change mode event.", newMode);
+ return;
+ }
+ spsService.stopGracefully();
+ break;
+ case NONE:
+ if (mode == newMode) {
+ LOG.info("Storage policy satisfier is already disabled, mode:{}"
+ + " so ignoring change mode event.", newMode);
+ return;
+ }
+ LOG.info("Disabling StoragePolicySatisfier, mode:{}", newMode);
+ spsService.stop(true);
+ removeAllPathIds();
+ break;
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Given mode: {} is invalid", newMode);
+ }
+ break;
+ }
+
+ // update sps mode
+ mode = newMode;
+ }
+
+ /**
+ * This function will do following logic based on the configured sps mode:
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.INTERNAL}, then
+ * timed wait to stop internal storage policy satisfier daemon threads.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.EXTERNAL}, then
+ * it won't do anything, just ignore it.
+ *
+ * <p>
+ * If the configured mode is {@link StoragePolicySatisfierMode.NONE}, then the
+ * service is disabled. It won't do any action, just ignore it.
+ */
+ public void stopGracefully() {
+ switch (mode) {
+ case INTERNAL:
+ spsService.stopGracefully();
+ break;
+ case EXTERNAL:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring, StoragePolicySatisfier feature is running"
+ + " outside namenode");
+ }
+ break;
+ case NONE:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring, StoragePolicySatisfier feature is disabled");
+ }
+ break;
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Invalid mode:{}", mode);
+ }
+ break;
+ }
+ }
+
+ /**
+ * @return true if the internal storage policy satisfier daemon is running,
+ * false otherwise.
+ */
+ public boolean isInternalSatisfierRunning() {
+ return spsService.isRunning();
+ }
+
+ /**
+ * @return internal SPS service instance.
+ */
+ public SPSService getInternalSPSService() {
+ return this.spsService;
+ }
+
+ /**
+ * @return status Storage policy satisfy status of the path. It is supported
+ * only for the internal sps daemon service.
+ * @throws IOException
+ * if the Satisfier is not running inside namenode.
+ */
+ public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus(
+ String path) throws IOException {
+ if (mode != StoragePolicySatisfierMode.INTERNAL) {
+ LOG.debug("Satisfier is not running inside namenode, so status "
+ + "can't be returned.");
+ throw new IOException("Satisfier is not running inside namenode, "
+ + "so status can't be returned.");
+ }
+ return spsService.checkStoragePolicySatisfyPathStatus(path);
+ }
+
+ /**
+ * @return the next SPS path id, on which path users has invoked to satisfy
+ * storages.
+ */
+ public Long getNextPathId() {
+ synchronized (pathsToBeTraveresed) {
+ return pathsToBeTraveresed.poll();
+ }
+ }
+
+ /**
+ * Verify that satisfier queue limit exceeds allowed outstanding limit.
+ */
+ public void verifyOutstandingPathQLimit() throws IOException {
+ long size = pathsToBeTraveresed.size();
+ // Checking that the SPS call Q exceeds the allowed limit.
+ if (outstandingPathsLimit - size <= 0) {
+ LOG.debug("Satisifer Q - outstanding limit:{}, current size:{}",
+ outstandingPathsLimit, size);
+ throw new IOException("Outstanding satisfier queue limit: "
+ + outstandingPathsLimit + " exceeded, try later!");
+ }
+ }
+
+ /**
+ * Removes the SPS path id from the list of sps paths.
+ */
+ public void removePathId(long trackId) {
+ synchronized (pathsToBeTraveresed) {
+ pathsToBeTraveresed.remove(trackId);
+ }
+ }
+
+ /**
+ * Clean up all sps path ids.
+ */
+ public void removeAllPathIds() {
+ synchronized (pathsToBeTraveresed) {
+ pathsToBeTraveresed.clear();
+ }
+ }
+
+ /**
+ * Adds the sps path to SPSPathIds list.
+ */
+ public void addPathId(long id) {
+ synchronized (pathsToBeTraveresed) {
+ pathsToBeTraveresed.add(id);
+ }
+ }
+
+ /**
+ * @return true if sps is configured as an internal service or external
+ * service, false otherwise.
+ */
+ public boolean isEnabled() {
+ return mode == StoragePolicySatisfierMode.INTERNAL
+ || mode == StoragePolicySatisfierMode.EXTERNAL;
+ }
+
+ /**
+ * @return sps service mode.
+ */
+ public StoragePolicySatisfierMode getMode() {
+ return mode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
index 59935b6..33448db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -73,7 +73,7 @@ public final class ExternalStoragePolicySatisfier {
boolean spsRunning;
spsRunning = nnc.getDistributedFileSystem().getClient()
- .isStoragePolicySatisfierRunning();
+ .isInternalSatisfierRunning();
if (spsRunning) {
throw new RuntimeException(
"Startup failed due to StoragePolicySatisfier"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a42e7a5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
index 3a2ad48..d8392fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/StoragePolicyAdmin.java
@@ -374,7 +374,7 @@ public class StoragePolicyAdmin extends Configured implements Tool {
}
final DistributedFileSystem dfs = AdminHelper.getDFS(conf);
try {
- if(dfs.getClient().isStoragePolicySatisfierRunning()){
+ if(dfs.getClient().isInternalSatisfierRunning()){
System.out.println("yes");
}else{
System.out.println("no");
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org