You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/02/16 14:40:30 UTC
[39/50] [abbrv] hadoop git commit: HDFS-12911. [SPS]: Modularize the
SPS code and expose necessary interfaces for external/internal
implementations. Contributed by Uma Maheswara Rao G
HDFS-12911. [SPS]: Modularize the SPS code and expose necessary interfaces for external/internal implementations. Contributed by Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e18821d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e18821d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e18821d7
Branch: refs/heads/HDFS-10285
Commit: e18821d7ae38ab7a8c70ea0c7071af3c3c378f3b
Parents: 307ffa5
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Fri Jan 19 08:51:49 2018 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Fri Feb 16 19:47:55 2018 +0530
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 61 +++++-
.../namenode/FSDirSatisfyStoragePolicyOp.java | 16 +-
.../hdfs/server/namenode/FSDirectory.java | 6 +-
.../hdfs/server/namenode/FSNamesystem.java | 10 +-
.../namenode/sps/BlockMoveTaskHandler.java | 44 ++++
.../namenode/sps/BlockMovementListener.java | 40 ++++
.../sps/BlockStorageMovementAttemptedItems.java | 28 +--
.../sps/BlockStorageMovementNeeded.java | 207 ++++---------------
.../hdfs/server/namenode/sps/Context.java | 43 ++--
.../server/namenode/sps/FileIdCollector.java | 43 ++++
.../IntraSPSNameNodeBlockMoveTaskHandler.java | 62 ++++++
.../namenode/sps/IntraSPSNameNodeContext.java | 62 ++----
.../sps/IntraSPSNameNodeFileIdCollector.java | 178 ++++++++++++++++
.../hdfs/server/namenode/sps/ItemInfo.java | 81 ++++++++
.../hdfs/server/namenode/sps/SPSPathIds.java | 63 ++++++
.../hdfs/server/namenode/sps/SPSService.java | 107 ++++++++++
.../namenode/sps/StoragePolicySatisfier.java | 175 +++++++---------
.../TestBlockStorageMovementAttemptedItems.java | 19 +-
.../sps/TestStoragePolicySatisfier.java | 111 ++++++----
...stStoragePolicySatisfierWithStripedFile.java | 19 +-
20 files changed, 938 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e97fb46..b8f49cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -93,8 +93,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.namenode.sps.Context;
-import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
+import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
@@ -441,7 +441,8 @@ public class BlockManager implements BlockStatsMXBean {
private final StoragePolicySatisfier sps;
private final boolean storagePolicyEnabled;
private boolean spsEnabled;
- private Context spsctxt = null;
+ private final SPSPathIds spsPaths;
+
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
@@ -487,8 +488,8 @@ public class BlockManager implements BlockStatsMXBean {
conf.getBoolean(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT);
- spsctxt = new IntraSPSNameNodeContext(namesystem, this, conf);
- sps = new StoragePolicySatisfier(spsctxt);
+ sps = new StoragePolicySatisfier(conf);
+ spsPaths = new SPSPathIds();
blockTokenSecretManager = createBlockTokenSecretManager(conf);
providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
@@ -5034,8 +5035,7 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is already running.");
return;
}
- // TODO: FSDirectory will get removed via HDFS-12911 modularization work
- sps.start(false, namesystem.getFSDirectory());
+ sps.start(false);
}
/**
@@ -5071,8 +5071,7 @@ public class BlockManager implements BlockStatsMXBean {
LOG.info("Storage policy satisfier is already running.");
return;
}
- // TODO: FSDirectory will get removed via HDFS-12911 modularization work
- sps.start(true, namesystem.getFSDirectory());
+ sps.start(true);
}
/**
@@ -5112,4 +5111,48 @@ public class BlockManager implements BlockStatsMXBean {
String path) throws IOException {
return sps.checkStoragePolicySatisfyPathStatus(path);
}
+
+ /**
+ * @return SPS service instance.
+ */
+ public SPSService getSPSService() {
+ return this.sps;
+ }
+
+ /**
+ * @return the next SPS path id, on which path users has invoked to satisfy
+ * storages.
+ */
+ public Long getNextSPSPathId() {
+ return spsPaths.pollNext();
+ }
+
+ /**
+ * Removes the SPS path id from the list of sps paths.
+ */
+ public void removeSPSPathId(long trackId) {
+ spsPaths.remove(trackId);
+ }
+
+ /**
+ * Clean up all sps path ids.
+ */
+ public void removeAllSPSPathIds() {
+ spsPaths.clear();
+ }
+
+ /**
+ * Adds the sps path to SPSPathIds list.
+ */
+ public void addSPSPathId(long id) {
+ spsPaths.add(id);
+ }
+
+ /**
+ * @return true if sps enabled.
+ */
+ public boolean isSPSEnabled() {
+ return spsEnabled;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
index fb6eec9..eed6e52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import com.google.common.collect.Lists;
@@ -87,21 +86,14 @@ final class FSDirSatisfyStoragePolicyOp {
}
static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) {
- if (inode.isFile() && inode.asFile().numBlocks() != 0) {
- // Adding directly in the storageMovementNeeded queue, So it can
- // get more priority compare to directory.
- fsd.getBlockManager().getStoragePolicySatisfier()
- .satisfyStoragePolicy(inode.getId());
- return true;
- } else if (inode.isDirectory()
- && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) {
+ if (inode.isFile() && inode.asFile().numBlocks() == 0) {
+ return false;
+ } else {
// Adding directory in the pending queue, so FileInodeIdCollector process
// directory child in batch and recursively
- fsd.getBlockManager().getStoragePolicySatisfier()
- .addInodeToPendingDirQueue(inode.getId());
+ fsd.getBlockManager().addSPSPathId(inode.getId());
return true;
}
- return false;
}
private static boolean inodeHasSatisfyXAttr(INode inode) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c984653..f56502a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1406,14 +1406,16 @@ public class FSDirectory implements Closeable {
if (!inode.isSymlink()) {
final XAttrFeature xaf = inode.getXAttrFeature();
addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
- addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
+ if (namesystem.getBlockManager().isSPSEnabled()) {
+ addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
+ }
}
}
}
private void addStoragePolicySatisfier(INodeWithAdditionalFields inode,
XAttrFeature xaf) {
- if (xaf == null || inode.isDirectory()) {
+ if (xaf == null) {
return;
}
XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 0c8a9be..7835760 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -258,6 +258,9 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
@@ -1287,7 +1290,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
edekCacheLoaderDelay, edekCacheLoaderInterval);
}
-
+ blockManager.getSPSService().init(
+ new IntraSPSNameNodeContext(this, blockManager,
+ blockManager.getSPSService()),
+ new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
+ blockManager.getSPSService()),
+ new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this));
blockManager.startSPS();
} finally {
startingActiveService = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
new file mode 100644
index 0000000..e6f78e1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMoveTaskHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+
+/**
+ * Interface for implementing different ways of block moving approaches. One can
+ * connect directly to DN and request block move, and other can talk NN to
+ * schedule via heart-beats.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface BlockMoveTaskHandler {
+
+ /**
+ * This is an interface method to handle the move tasks. BlockMovingInfo must
+ * contain the required info to move the block, that source location,
+ * destination location and storage types.
+ */
+ void submitMoveTask(BlockMovingInfo blkMovingInfo,
+ BlockMovementListener blockMoveCompletionListener) throws IOException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java
new file mode 100644
index 0000000..36473f3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockMovementListener.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Interface for notifying about block movement attempt completion.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface BlockMovementListener {
+
+ /**
+ * This method used to notify to the SPS about block movement attempt
+ * finished. Then SPS will re-check whether it needs retry or not.
+ *
+ * @param moveAttemptFinishedBlks
+ * -list of movement attempt finished blocks
+ */
+ void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
index 1cae027..3f0155d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java
@@ -32,7 +32,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +46,8 @@ import com.google.common.annotations.VisibleForTesting;
* finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes.
*/
-public class BlockStorageMovementAttemptedItems {
+public class BlockStorageMovementAttemptedItems
+ implements BlockMovementListener {
private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@@ -71,19 +71,19 @@ public class BlockStorageMovementAttemptedItems {
//
private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded;
- private final Context ctxt;
+ private final SPSService service;
- public BlockStorageMovementAttemptedItems(Context context,
+ public BlockStorageMovementAttemptedItems(SPSService service,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
- this.ctxt = context;
- long recheckTimeout = ctxt.getConf().getLong(
+ this.service = service;
+ long recheckTimeout = this.service.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT);
if (recheckTimeout > 0) {
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
}
- this.selfRetryTimeout = ctxt.getConf().getLong(
+ this.selfRetryTimeout = this.service.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT);
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
@@ -111,7 +111,7 @@ public class BlockStorageMovementAttemptedItems {
* @param moveAttemptFinishedBlks
* storage movement attempt finished blocks
*/
- public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+ public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks.length == 0) {
return;
}
@@ -191,7 +191,7 @@ public class BlockStorageMovementAttemptedItems {
AttemptedItemInfo itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
- Long blockCollectionID = itemInfo.getTrackId();
+ Long blockCollectionID = itemInfo.getFileId();
synchronized (movementFinishedBlocks) {
ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
blockCollectionID, itemInfo.getRetryCount() + 1);
@@ -223,7 +223,7 @@ public class BlockStorageMovementAttemptedItems {
// gets the chance first and can be cleaned from queue quickly as
// all movements already done.
blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo
- .getStartId(), attemptedItemInfo.getTrackId(),
+ .getStartId(), attemptedItemInfo.getFileId(),
attemptedItemInfo.getRetryCount() + 1));
iterator.remove();
}
@@ -246,7 +246,11 @@ public class BlockStorageMovementAttemptedItems {
}
public void clearQueues() {
- movementFinishedBlocks.clear();
- storageMovementAttemptedItems.clear();
+ synchronized (movementFinishedBlocks) {
+ movementFinishedBlocks.clear();
+ }
+ synchronized (storageMovementAttemptedItems) {
+ storageMovementAttemptedItems.clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
index 80f1893..39a0051 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java
@@ -17,11 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode.sps;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY;
-
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -33,12 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
-import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
-import org.apache.hadoop.hdfs.server.namenode.INode;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@@ -75,22 +65,21 @@ public class BlockStorageMovementNeeded {
private final Context ctxt;
- // List of pending dir to satisfy the policy
- private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+ private Daemon pathIdCollector;
- private Daemon inodeIdCollector;
+ private FileIdCollector fileIDCollector;
- private final int maxQueuedItem;
+ private SPSPathIdProcessor pathIDProcessor;
// Amount of time to cache the SUCCESS status of path before turning it to
// NOT_AVAILABLE.
private static long statusClearanceElapsedTimeMs = 300000;
- public BlockStorageMovementNeeded(Context context) {
+ public BlockStorageMovementNeeded(Context context,
+ FileIdCollector fileIDCollector) {
this.ctxt = context;
- this.maxQueuedItem = ctxt.getConf().getInt(
- DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
- DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+ this.fileIDCollector = fileIDCollector;
+ pathIDProcessor = new SPSPathIdProcessor();
}
/**
@@ -140,29 +129,6 @@ public class BlockStorageMovementNeeded {
return storageMovementNeeded.poll();
}
- public synchronized void addToPendingDirQueue(long id) {
- spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.PENDING));
- spsDirsToBeTraveresed.add(id);
- // Notify waiting FileInodeIdCollector thread about the newly
- // added SPS path.
- synchronized (spsDirsToBeTraveresed) {
- spsDirsToBeTraveresed.notify();
- }
- }
-
- /**
- * Returns queue remaining capacity.
- */
- public synchronized int remainingCapacity() {
- int size = storageMovementNeeded.size();
- if (size >= maxQueuedItem) {
- return 0;
- } else {
- return (maxQueuedItem - size);
- }
- }
-
/**
* Returns queue size.
*/
@@ -171,7 +137,7 @@ public class BlockStorageMovementNeeded {
}
public synchronized void clearAll() {
- spsDirsToBeTraveresed.clear();
+ ctxt.removeAllSPSPathIds();
storageMovementNeeded.clear();
pendingWorkForDirectory.clear();
}
@@ -206,13 +172,13 @@ public class BlockStorageMovementNeeded {
} else {
// Remove xAttr if trackID doesn't exist in
// storageMovementAttemptedItems or file policy satisfied.
- ctxt.removeSPSHint(trackInfo.getTrackId());
+ ctxt.removeSPSHint(trackInfo.getFileId());
updateStatus(trackInfo.getStartId(), isSuccess);
}
}
public synchronized void clearQueue(long trackId) {
- spsDirsToBeTraveresed.remove(trackId);
+ ctxt.removeSPSPathId(trackId);
Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
while (iterator.hasNext()) {
ItemInfo next = iterator.next();
@@ -249,7 +215,7 @@ public class BlockStorageMovementNeeded {
public synchronized void clearQueuesWithNotification() {
// Remove xAttr from directories
Long trackId;
- while ((trackId = spsDirsToBeTraveresed.poll()) != null) {
+ while ((trackId = ctxt.getNextSPSPathId()) != null) {
try {
// Remove xAttr for file
ctxt.removeSPSHint(trackId);
@@ -265,12 +231,12 @@ public class BlockStorageMovementNeeded {
try {
// Remove xAttr for file
if (!itemInfo.isDir()) {
- ctxt.removeSPSHint(itemInfo.getTrackId());
+ ctxt.removeSPSHint(itemInfo.getFileId());
}
} catch (IOException ie) {
LOG.warn(
"Failed to remove SPS xattr for track id "
- + itemInfo.getTrackId(), ie);
+ + itemInfo.getFileId(), ie);
}
}
this.clearAll();
@@ -280,57 +246,33 @@ public class BlockStorageMovementNeeded {
* Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
* ID's to process for satisfy the policy.
*/
- private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
- implements Runnable {
-
- private int remainingCapacity = 0;
-
- private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
-
- StorageMovementPendingInodeIdCollector(FSDirectory dir) {
- super(dir);
- }
+ private class SPSPathIdProcessor implements Runnable {
@Override
public void run() {
LOG.info("Starting FileInodeIdCollector!.");
long lastStatusCleanTime = 0;
while (ctxt.isRunning()) {
+ LOG.info("Running FileInodeIdCollector!.");
try {
if (!ctxt.isInSafeMode()) {
- Long startINodeId = spsDirsToBeTraveresed.poll();
+ Long startINodeId = ctxt.getNextSPSPathId();
if (startINodeId == null) {
// Waiting for SPS path
- synchronized (spsDirsToBeTraveresed) {
- spsDirsToBeTraveresed.wait(5000);
- }
+ Thread.sleep(3000);
} else {
- INode startInode = getFSDirectory().getInode(startINodeId);
- if (startInode != null) {
- try {
- remainingCapacity = remainingCapacity();
- spsStatus.put(startINodeId,
- new StoragePolicySatisfyPathStatusInfo(
- StoragePolicySatisfyPathStatus.IN_PROGRESS));
- readLock();
- traverseDir(startInode.asDirectory(), startINodeId,
- HdfsFileStatus.EMPTY_NAME,
- new SPSTraverseInfo(startINodeId));
- } finally {
- readUnlock();
- }
- // Mark startInode traverse is done
- addAll(startInode.getId(), currentBatch, true);
- currentBatch.clear();
-
- // check if directory was empty and no child added to queue
- DirPendingWorkInfo dirPendingWorkInfo =
- pendingWorkForDirectory.get(startInode.getId());
- if (dirPendingWorkInfo.isDirWorkDone()) {
- ctxt.removeSPSHint(startInode.getId());
- pendingWorkForDirectory.remove(startInode.getId());
- updateStatus(startInode.getId(), true);
- }
+ spsStatus.put(startINodeId,
+ new StoragePolicySatisfyPathStatusInfo(
+ StoragePolicySatisfyPathStatus.IN_PROGRESS));
+ fileIDCollector.scanAndCollectFileIds(startINodeId);
+ // check if directory was empty and no child added to queue
+ DirPendingWorkInfo dirPendingWorkInfo =
+ pendingWorkForDirectory.get(startINodeId);
+ if (dirPendingWorkInfo != null
+ && dirPendingWorkInfo.isDirWorkDone()) {
+ ctxt.removeSPSHint(startINodeId);
+ pendingWorkForDirectory.remove(startINodeId);
+ updateStatus(startINodeId, true);
}
}
//Clear the SPS status if status is in SUCCESS more than 5 min.
@@ -355,71 +297,6 @@ public class BlockStorageMovementNeeded {
}
}
}
-
- @Override
- protected void checkPauseForTesting() throws InterruptedException {
- // TODO implement if needed
- }
-
- @Override
- protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
- throws IOException, InterruptedException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing {} for statisy the policy",
- inode.getFullPathName());
- }
- if (!inode.isFile()) {
- return false;
- }
- if (inode.isFile() && inode.asFile().numBlocks() != 0) {
- currentBatch.add(new ItemInfo(
- ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
- remainingCapacity--;
- }
- return true;
- }
-
- @Override
- protected boolean canSubmitCurrentBatch() {
- return remainingCapacity <= 0;
- }
-
- @Override
- protected void checkINodeReady(long startId) throws IOException {
- // SPS work won't be scheduled if NN is in standby. So, skipping NN
- // standby check.
- return;
- }
-
- @Override
- protected void submitCurrentBatch(long startId)
- throws IOException, InterruptedException {
- // Add current child's to queue
- addAll(startId, currentBatch, false);
- currentBatch.clear();
- }
-
- @Override
- protected void throttle() throws InterruptedException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
- + " waiting for some free slots.");
- }
- remainingCapacity = remainingCapacity();
- // wait for queue to be free
- while (remainingCapacity <= 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting for storageMovementNeeded queue to be free!");
- }
- Thread.sleep(5000);
- remainingCapacity = remainingCapacity();
- }
- }
-
- @Override
- protected boolean canTraverseDir(INode inode) throws IOException {
- return true;
- }
}
/**
@@ -476,29 +353,15 @@ public class BlockStorageMovementNeeded {
}
}
- // TODO: FSDirectory will get removed via HDFS-12911 modularization work
- public void init(FSDirectory fsd) {
- inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
- fsd));
- inodeIdCollector.setName("FileInodeIdCollector");
- inodeIdCollector.start();
+ public void activate() {
+ pathIdCollector = new Daemon(pathIDProcessor);
+ pathIdCollector.setName("SPSPathIdProcessor");
+ pathIdCollector.start();
}
public void close() {
- if (inodeIdCollector != null) {
- inodeIdCollector.interrupt();
- }
- }
-
- class SPSTraverseInfo extends TraverseInfo {
- private long startId;
-
- SPSTraverseInfo(long startId) {
- this.startId = startId;
- }
-
- public long getStartId() {
- return startId;
+ if (pathIdCollector != null) {
+ pathIdCollector.interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index d11e26f..b7053b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -19,11 +19,9 @@
package org.apache.hadoop.hdfs.server.namenode.sps;
import java.io.IOException;
-import java.util.function.Supplier;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -31,7 +29,6 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
@@ -43,24 +40,11 @@ import org.apache.hadoop.security.AccessControlException;
public interface Context {
/**
- * Returns configuration object.
- */
- Configuration getConf();
-
- /**
* Returns true if the SPS is running, false otherwise.
*/
boolean isRunning();
/**
- * Update the SPS running status.
- *
- * @param isSpsRunning
- * true represents running, false otherwise
- */
- void setSPSRunning(Supplier<Boolean> isSpsRunning);
-
- /**
* Returns true if the Namenode in safe mode, false otherwise.
*/
boolean isInSafeMode();
@@ -153,17 +137,6 @@ public interface Context {
boolean hasLowRedundancyBlocks(long inodeID);
/**
- * Assign the given block movement task to the target node present in
- * {@link BlockMovingInfo}.
- *
- * @param blkMovingInfo
- * block to storage info
- * @throws IOException
- */
- void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
- throws IOException;
-
- /**
* Checks whether the given datanode has sufficient space to occupy the given
* blockSize data.
*
@@ -178,4 +151,20 @@ public interface Context {
*/
boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long blockSize);
+
+ /**
+ * @return next SPS path id to process.
+ */
+ Long getNextSPSPathId();
+
+ /**
+ * Removes the SPS path id.
+ */
+ void removeSPSPathId(long pathId);
+
+ /**
+ * Removes all SPS path ids.
+ */
+ void removeAllSPSPathIds();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
new file mode 100644
index 0000000..7cf77f0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/FileIdCollector.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * An interface for scanning the directory recursively and collect file ids
+ * under the given directory.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FileIdCollector {
+
+ /**
+ * Scans the given inode directory and collects the file ids under that
+ * directory and adds to the given BlockStorageMovementNeeded.
+ *
+ * @param inodeID
+ * - The directory ID
+ */
+ void scanAndCollectFileIds(Long inodeId)
+ throws IOException, InterruptedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
new file mode 100644
index 0000000..1da4af9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+
+/**
+ * This class handles the internal SPS block movements. This will assign block
+ * movement tasks to target datanode descriptors.
+ */
+public class IntraSPSNameNodeBlockMoveTaskHandler
+ implements BlockMoveTaskHandler {
+
+ private BlockManager blockManager;
+ private Namesystem namesystem;
+
+ public IntraSPSNameNodeBlockMoveTaskHandler(BlockManager blockManager,
+ Namesystem namesytem) {
+ this.blockManager = blockManager;
+ this.namesystem = namesytem;
+ }
+
+ @Override
+ public void submitMoveTask(BlockMovingInfo blkMovingInfo,
+ BlockMovementListener blockMoveCompletionListener) throws IOException {
+ namesystem.readLock();
+ try {
+ DatanodeDescriptor dn = blockManager.getDatanodeManager()
+ .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
+ if (dn == null) {
+ throw new IOException("Failed to schedule block movement task:"
+ + blkMovingInfo + " as target datanode: "
+ + blkMovingInfo.getTarget() + " doesn't exists");
+ }
+ dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
+ dn.addBlocksToMoveStorage(blkMovingInfo);
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index 6654212..cef26ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -20,10 +20,8 @@ package org.apache.hadoop.hdfs.server.namenode.sps;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
import java.io.IOException;
-import java.util.function.Supplier;
import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -38,7 +36,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AccessControlException;
import org.slf4j.Logger;
@@ -55,15 +52,14 @@ public class IntraSPSNameNodeContext implements Context {
private final Namesystem namesystem;
private final BlockManager blockManager;
- private final Configuration conf;
- private Supplier<Boolean> isSpsRunning;
+
+ private SPSService service;
public IntraSPSNameNodeContext(Namesystem namesystem,
- BlockManager blockManager, Configuration conf) {
+ BlockManager blockManager, SPSService service) {
this.namesystem = namesystem;
this.blockManager = blockManager;
- this.conf = conf;
- isSpsRunning = () -> false;
+ this.service = service;
}
@Override
@@ -111,11 +107,6 @@ public class IntraSPSNameNodeContext implements Context {
}
@Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
public boolean isFileExist(long inodeId) {
return namesystem.getFSDirectory().getInode(inodeId) != null;
}
@@ -127,16 +118,7 @@ public class IntraSPSNameNodeContext implements Context {
@Override
public boolean isRunning() {
- // TODO : 'isSpsRunning' flag has been added to avoid the NN lock inside
- // SPS. Context interface will be further refined as part of HDFS-12911
- // modularization task. One idea is to introduce a cleaner interface similar
- // to Namesystem for better abstraction.
- return namesystem.isRunning() && isSpsRunning.get();
- }
-
- @Override
- public void setSPSRunning(Supplier<Boolean> spsRunningFlag) {
- this.isSpsRunning = spsRunningFlag;
+ return namesystem.isRunning() && service.isRunning();
}
@Override
@@ -183,25 +165,6 @@ public class IntraSPSNameNodeContext implements Context {
}
@Override
- public void assignBlockMoveTaskToTargetNode(BlockMovingInfo blkMovingInfo)
- throws IOException {
- namesystem.readLock();
- try {
- DatanodeDescriptor dn = blockManager.getDatanodeManager()
- .getDatanode(blkMovingInfo.getTarget().getDatanodeUuid());
- if (dn == null) {
- throw new IOException("Failed to schedule block movement task:"
- + blkMovingInfo + " as target datanode: "
- + blkMovingInfo.getTarget() + " doesn't exists");
- }
- dn.addBlocksToMoveStorage(blkMovingInfo);
- dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
- } finally {
- namesystem.readUnlock();
- }
- }
-
- @Override
public boolean verifyTargetDatanodeHasSpaceForScheduling(DatanodeInfo dn,
StorageType type, long blockSize) {
namesystem.readLock();
@@ -217,4 +180,19 @@ public class IntraSPSNameNodeContext implements Context {
namesystem.readUnlock();
}
}
+
+ @Override
+ public Long getNextSPSPathId() {
+ return blockManager.getNextSPSPathId();
+ }
+
+ @Override
+ public void removeSPSPathId(long trackId) {
+ blockManager.removeSPSPathId(trackId);
+ }
+
+ @Override
+ public void removeAllSPSPathIds() {
+ blockManager.removeAllSPSPathIds();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
new file mode 100644
index 0000000..c6834c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+
+/**
+ * A specific implementation for scanning the directory with Namenode internal
+ * Inode structure and collects the file ids under the given directory ID.
+ */
+public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
+ implements FileIdCollector {
+ private int maxQueueLimitToScan;
+ private final SPSService service;
+
+ private int remainingCapacity = 0;
+
+ private List<ItemInfo> currentBatch;
+
+ public IntraSPSNameNodeFileIdCollector(FSDirectory dir, SPSService service) {
+ super(dir);
+ this.service = service;
+ this.maxQueueLimitToScan = service.getConf().getInt(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT);
+ currentBatch = new ArrayList<>(maxQueueLimitToScan);
+ }
+
+ @Override
+ protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+ throws IOException, InterruptedException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing {} for statisy the policy",
+ inode.getFullPathName());
+ }
+ if (!inode.isFile()) {
+ return false;
+ }
+ if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+ currentBatch.add(new ItemInfo(
+ ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+ remainingCapacity--;
+ }
+ return true;
+ }
+
+ @Override
+ protected boolean canSubmitCurrentBatch() {
+ return remainingCapacity <= 0;
+ }
+
+ @Override
+ protected void checkINodeReady(long startId) throws IOException {
+ // SPS work won't be scheduled if NN is in standby. So, skipping NN
+ // standby check.
+ return;
+ }
+
+ @Override
+ protected void submitCurrentBatch(long startId)
+ throws IOException, InterruptedException {
+ // Add current child's to queue
+ service.addAllFileIdsToProcess(startId,
+ currentBatch, false);
+ currentBatch.clear();
+ }
+
+ @Override
+ protected void throttle() throws InterruptedException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+ + " waiting for some free slots.");
+ }
+ remainingCapacity = remainingCapacity();
+ // wait for queue to be free
+ while (remainingCapacity <= 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+ }
+ Thread.sleep(5000);
+ remainingCapacity = remainingCapacity();
+ }
+ }
+
+ @Override
+ protected boolean canTraverseDir(INode inode) throws IOException {
+ return true;
+ }
+
+ @Override
+ protected void checkPauseForTesting() throws InterruptedException {
+ // Nothing to do
+ }
+
+ @Override
+ public void scanAndCollectFileIds(final Long startINodeId)
+ throws IOException, InterruptedException {
+ FSDirectory fsd = getFSDirectory();
+ INode startInode = fsd.getInode(startINodeId);
+ if (startInode != null) {
+ remainingCapacity = remainingCapacity();
+ if (remainingCapacity == 0) {
+ throttle();
+ }
+ if (startInode.isFile()) {
+ currentBatch.add(new ItemInfo(startInode.getId(), startInode.getId()));
+ } else {
+
+ readLock();
+ // NOTE: this lock will not be held until full directory scanning. It is
+ // basically a sliced locking. Once it collects a batch size( at max the
+ // size of maxQueueLimitToScan (default 1000)) file ids, then it will
+ // unlock and submits the current batch to SPSService. Once
+ // service.processingQueueSize() shows empty slots, then lock will be
+ // resumed and scan also will be resumed. This logic was re-used from
+ // EDEK feature.
+ try {
+ traverseDir(startInode.asDirectory(), startINodeId,
+ HdfsFileStatus.EMPTY_NAME, new SPSTraverseInfo(startINodeId));
+ } finally {
+ readUnlock();
+ }
+ }
+ // Mark startInode traverse is done, this is last-batch
+ service.addAllFileIdsToProcess(startInode.getId(), currentBatch, true);
+ currentBatch.clear();
+ }
+ }
+
+ /**
+ * Returns queue remaining capacity.
+ */
+ public synchronized int remainingCapacity() {
+ int size = service.processingQueueSize();
+ if (size >= maxQueueLimitToScan) {
+ return 0;
+ } else {
+ return (maxQueueLimitToScan - size);
+ }
+ }
+
+ class SPSTraverseInfo extends TraverseInfo {
+ private long startId;
+
+ SPSTraverseInfo(long startId) {
+ this.startId = startId;
+ }
+
+ public long getStartId() {
+ return startId;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
new file mode 100644
index 0000000..47c64cc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/ItemInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * ItemInfo is a file info object for which need to satisfy the policy.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ItemInfo {
+ private long startId;
+ private long fileId;
+ private int retryCount;
+
+ public ItemInfo(long startId, long fileId) {
+ this.startId = startId;
+ this.fileId = fileId;
+ // set 0 when item is getting added first time in queue.
+ this.retryCount = 0;
+ }
+
+ public ItemInfo(final long startId, final long fileId, final int retryCount) {
+ this.startId = startId;
+ this.fileId = fileId;
+ this.retryCount = retryCount;
+ }
+
+ /**
+ * Return the start inode id of the current track Id. This indicates that SPS
+ * was invoked on this inode id.
+ */
+ public long getStartId() {
+ return startId;
+ }
+
+ /**
+ * Return the File inode Id for which needs to satisfy the policy.
+ */
+ public long getFileId() {
+ return fileId;
+ }
+
+ /**
+ * Returns true if the tracking path is a directory, false otherwise.
+ */
+ public boolean isDir() {
+ return (startId != fileId);
+ }
+
+ /**
+ * Get the attempted retry count of the block for satisfy the policy.
+ */
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ /**
+ * Increments the retry count.
+ */
+ public void increRetryCount() {
+ this.retryCount++;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
new file mode 100644
index 0000000..cd6ad22
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class which holds the SPS invoked path ids.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SPSPathIds {
+
+ // List of pending dir to satisfy the policy
+ private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
+
+ /**
+ * Add the path id to queue.
+ */
+ public synchronized void add(long pathId) {
+ spsDirsToBeTraveresed.add(pathId);
+ }
+
+ /**
+ * Removes the path id.
+ */
+ public synchronized void remove(long pathId) {
+ spsDirsToBeTraveresed.remove(pathId);
+ }
+
+ /**
+ * Clears all path ids.
+ */
+ public synchronized void clear() {
+ spsDirsToBeTraveresed.clear();
+ }
+
+ /**
+ * @return next path id available in queue.
+ */
+ public synchronized Long pollNext() {
+ return spsDirsToBeTraveresed.poll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
new file mode 100644
index 0000000..6d85ea6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An interface for SPSService, which exposes life cycle and processing APIs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface SPSService {
+
+ /**
+ * Initializes the helper services.
+ *
+ * @param ctxt
+ * - context is an helper service to provide communication channel
+ * between NN and SPS
+ * @param fileIDCollector
+ * - a helper service for scanning the files under a given directory
+ * id
+ * @param handler
+ * - a helper service for moving the blocks
+ */
+ void init(Context ctxt, FileIdCollector fileIDCollector,
+ BlockMoveTaskHandler handler);
+
+ /**
+ * Starts the SPS service. Make sure to initialize the helper services before
+ * invoking this method.
+ *
+ * @param reconfigStart
+ * - to indicate whether the SPS startup requested from
+ * reconfiguration service
+ */
+ void start(boolean reconfigStart);
+
+ /**
+ * Stops the SPS service gracefully. Timed wait to stop storage policy
+ * satisfier daemon threads.
+ */
+ void stopGracefully();
+
+ /**
+ * Disable the SPS service.
+ *
+ * @param forceStop
+ */
+ void disable(boolean forceStop);
+
+ /**
+ * Check whether StoragePolicySatisfier is running.
+ *
+ * @return true if running
+ */
+ boolean isRunning();
+
+ /**
+ * Adds the Item information(file id etc) to processing queue.
+ *
+ * @param itemInfo
+ */
+ void addFileIdToProcess(ItemInfo itemInfo);
+
+ /**
+ * Adds all the Item information(file id etc) to processing queue.
+ *
+ * @param startId
+ * - directory/file id, on which SPS was called.
+ * @param itemInfoList
+ * - list of item infos
+ * @param scanCompleted
+ * - whether the scanning of directory fully done with itemInfoList
+ */
+ void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+ boolean scanCompleted);
+
+ /**
+ * @return current processing queue size.
+ */
+ int processingQueueSize();
+
+ /**
+ * @return the configuration.
+ */
+ Configuration getConf();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index b3e6b78..28c1372 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -47,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.balancer.Matcher;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@@ -64,28 +64,34 @@ import com.google.common.annotations.VisibleForTesting;
* storage policy type in Namespace, but physical block storage movement will
* not happen until user runs "Mover Tool" explicitly for such files. The
* StoragePolicySatisfier Daemon thread implemented for addressing the case
- * where users may want to physically move the blocks by HDFS itself instead of
- * running mover tool explicitly. Just calling client API to
- * satisfyStoragePolicy on a file/dir will automatically trigger to move its
- * physical storage locations as expected in asynchronous manner. Here Namenode
- * will pick the file blocks which are expecting to change its storages, then it
- * will build the mapping of source block location and expected storage type and
- * location to move. After that this class will also prepare commands to send to
- * Datanode for processing the physical block movements.
+ * where users may want to physically move the blocks by a dedidated daemon (can
+ * run inside Namenode or stand alone) instead of running mover tool explicitly.
+ * Just calling client API to satisfyStoragePolicy on a file/dir will
+ * automatically trigger to move its physical storage locations as expected in
+ * asynchronous manner. Here SPS will pick the file blocks which are expecting
+ * to change its storages, then it will build the mapping of source block
+ * location and expected storage type and location to move. After that this
+ * class will also prepare requests to send to Datanode for processing the
+ * physical block movements.
*/
@InterfaceAudience.Private
-public class StoragePolicySatisfier implements Runnable {
+public class StoragePolicySatisfier implements SPSService, Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(StoragePolicySatisfier.class);
private Daemon storagePolicySatisfierThread;
- private final BlockStorageMovementNeeded storageMovementNeeded;
- private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
+ private BlockStorageMovementNeeded storageMovementNeeded;
+ private BlockStorageMovementAttemptedItems storageMovementsMonitor;
private volatile boolean isRunning = false;
private int spsWorkMultiplier;
private long blockCount = 0L;
private int blockMovementMaxRetry;
- private final Context ctxt;
+ private Context ctxt;
+ private BlockMoveTaskHandler blockMoveTaskHandler;
+ private Configuration conf;
+ public StoragePolicySatisfier(Configuration conf) {
+ this.conf = conf;
+ }
/**
* Represents the collective analysis status for all blocks.
*/
@@ -125,13 +131,17 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- public StoragePolicySatisfier(Context ctxt) {
- this.ctxt = ctxt;
- this.storageMovementNeeded = new BlockStorageMovementNeeded(ctxt);
- this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(ctxt,
+ public void init(final Context context, final FileIdCollector fileIDCollector,
+ final BlockMoveTaskHandler blockMovementTaskHandler) {
+ this.ctxt = context;
+ this.storageMovementNeeded =
+ new BlockStorageMovementNeeded(context, fileIDCollector);
+ this.storageMovementsMonitor =
+ new BlockStorageMovementAttemptedItems(this,
storageMovementNeeded);
- this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(ctxt.getConf());
- this.blockMovementMaxRetry = ctxt.getConf().getInt(
+ this.blockMoveTaskHandler = blockMovementTaskHandler;
+ this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
+ this.blockMovementMaxRetry = getConf().getInt(
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY,
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT);
}
@@ -139,12 +149,10 @@ public class StoragePolicySatisfier implements Runnable {
/**
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
- *
- * // TODO: FSDirectory will get removed via HDFS-12911 modularization work.
*/
- public synchronized void start(boolean reconfigStart, FSDirectory fsd) {
+ @Override
+ public synchronized void start(boolean reconfigStart) {
isRunning = true;
- ctxt.setSPSRunning(this::isRunning);
if (ctxt.isMoverRunning()) {
isRunning = false;
LOG.error(
@@ -163,20 +171,14 @@ public class StoragePolicySatisfier implements Runnable {
// Ensure that all the previously submitted block movements(if any) have to
// be stopped in all datanodes.
addDropSPSWorkCommandsToAllDNs();
- storageMovementNeeded.init(fsd);
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
this.storageMovementsMonitor.start();
+ this.storageMovementNeeded.activate();
}
- /**
- * Disables storage policy satisfier by stopping its services.
- *
- * @param forceStop
- * true represents that it should stop SPS service by clearing all
- * pending SPS work
- */
+ @Override
public synchronized void disable(boolean forceStop) {
isRunning = false;
if (storagePolicySatisfierThread == null) {
@@ -195,14 +197,15 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- /**
- * Timed wait to stop storage policy satisfier daemon threads.
- */
+ @Override
public synchronized void stopGracefully() {
if (isRunning) {
disable(true);
}
- this.storageMovementsMonitor.stopGracefully();
+
+ if (this.storageMovementsMonitor != null) {
+ this.storageMovementsMonitor.stopGracefully();
+ }
if (storagePolicySatisfierThread == null) {
return;
@@ -213,10 +216,7 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- /**
- * Check whether StoragePolicySatisfier is running.
- * @return true if running
- */
+ @Override
public boolean isRunning() {
return isRunning;
}
@@ -239,11 +239,11 @@ public class StoragePolicySatisfier implements Runnable {
if(itemInfo.getRetryCount() >= blockMovementMaxRetry){
LOG.info("Failed to satisfy the policy after "
+ blockMovementMaxRetry + " retries. Removing inode "
- + itemInfo.getTrackId() + " from the queue");
+ + itemInfo.getFileId() + " from the queue");
storageMovementNeeded.removeItemTrackInfo(itemInfo, false);
continue;
}
- long trackId = itemInfo.getTrackId();
+ long trackId = itemInfo.getFileId();
BlocksMovingAnalysis status = null;
DatanodeStorageReport[] liveDnReports;
BlockStoragePolicy existingStoragePolicy;
@@ -273,7 +273,7 @@ public class StoragePolicySatisfier implements Runnable {
// be removed on storage movement attempt finished report.
case BLOCKS_TARGETS_PAIRED:
this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
- .getStartId(), itemInfo.getTrackId(), monotonicNow(),
+ .getStartId(), itemInfo.getFileId(), monotonicNow(),
status.assignedBlocks, itemInfo.getRetryCount()));
break;
case NO_BLOCKS_TARGETS_PAIRED:
@@ -282,7 +282,7 @@ public class StoragePolicySatisfier implements Runnable {
+ " back to retry queue as none of the blocks"
+ " found its eligible targets.");
}
- itemInfo.retryCount++;
+ itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case FEW_LOW_REDUNDANCY_BLOCKS:
@@ -426,7 +426,8 @@ public class StoragePolicySatisfier implements Runnable {
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
try {
- ctxt.assignBlockMoveTaskToTargetNode(blkMovingInfo);
+ blockMoveTaskHandler.submitMoveTask(blkMovingInfo,
+ storageMovementsMonitor);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
assignedBlockIds.add(blkMovingInfo.getBlock());
blockCount++;
@@ -611,7 +612,6 @@ public class StoragePolicySatisfier implements Runnable {
expected.remove(chosenTarget.storageType);
excludeNodes.add(chosenTarget.dn);
- // TODO: We can increment scheduled block count for this node?
} else {
LOG.warn(
"Failed to choose target datanode for the required"
@@ -830,11 +830,11 @@ public class StoragePolicySatisfier implements Runnable {
return;
}
storageMovementsMonitor
- .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
+ .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks());
}
@VisibleForTesting
- BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
+ BlockMovementListener getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
@@ -863,10 +863,6 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- public void addInodeToPendingDirQueue(long id) {
- storageMovementNeeded.addToPendingDirQueue(id);
- }
-
/**
* Clear queues for given track id.
*/
@@ -875,57 +871,6 @@ public class StoragePolicySatisfier implements Runnable {
}
/**
- * ItemInfo is a file info object for which need to satisfy the
- * policy.
- */
- public static class ItemInfo {
- private long startId;
- private long trackId;
- private int retryCount;
-
- public ItemInfo(long startId, long trackId) {
- this.startId = startId;
- this.trackId = trackId;
- //set 0 when item is getting added first time in queue.
- this.retryCount = 0;
- }
-
- public ItemInfo(long startId, long trackId, int retryCount) {
- this.startId = startId;
- this.trackId = trackId;
- this.retryCount = retryCount;
- }
-
- /**
- * Return the start inode id of the current track Id.
- */
- public long getStartId() {
- return startId;
- }
-
- /**
- * Return the File inode Id for which needs to satisfy the policy.
- */
- public long getTrackId() {
- return trackId;
- }
-
- /**
- * Returns true if the tracking path is a directory, false otherwise.
- */
- public boolean isDir() {
- return (startId != trackId);
- }
-
- /**
- * Get the attempted retry count of the block for satisfy the policy.
- */
- public int getRetryCount() {
- return retryCount;
- }
- }
-
- /**
* This class contains information of an attempted blocks and its last
* attempted or reported time stamp. This is used by
* {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
@@ -977,4 +922,30 @@ public class StoragePolicySatisfier implements Runnable {
String path) throws IOException {
return storageMovementNeeded.getStatus(ctxt.getFileID(path));
}
+
+ @Override
+ public void addFileIdToProcess(ItemInfo trackInfo) {
+ storageMovementNeeded.add(trackInfo);
+ }
+
+ @Override
+ public void addAllFileIdsToProcess(long startId, List<ItemInfo> itemInfoList,
+ boolean scanCompleted) {
+ getStorageMovementQueue().addAll(startId, itemInfoList, scanCompleted);
+ }
+
+ @Override
+ public int processingQueueSize() {
+ return storageMovementNeeded.size();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @VisibleForTesting
+ public BlockStorageMovementNeeded getStorageMovementQueue() {
+ return storageMovementNeeded;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e18821d7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
index f9762a8..3e2c324 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,12 +48,14 @@ public class TestBlockStorageMovementAttemptedItems {
public void setup() throws Exception {
Configuration config = new HdfsConfiguration();
Context ctxt = Mockito.mock(Context.class);
- Mockito.when(ctxt.getConf()).thenReturn(config);
+ SPSService sps = Mockito.mock(StoragePolicySatisfier.class);
+ Mockito.when(sps.getConf()).thenReturn(config);
Mockito.when(ctxt.isRunning()).thenReturn(true);
Mockito.when(ctxt.isInSafeMode()).thenReturn(false);
Mockito.when(ctxt.isFileExist(Mockito.anyLong())).thenReturn(true);
- unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(ctxt);
- bsmAttemptedItems = new BlockStorageMovementAttemptedItems(ctxt,
+ unsatisfiedStorageMovementFiles =
+ new BlockStorageMovementNeeded(ctxt, null);
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
unsatisfiedStorageMovementFiles);
}
@@ -73,7 +74,7 @@ public class TestBlockStorageMovementAttemptedItems {
while (monotonicNow() < (stopTime)) {
ItemInfo ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
- if (item == ele.getTrackId()) {
+ if (item == ele.getFileId()) {
isItemFound = true;
break;
}
@@ -99,7 +100,7 @@ public class TestBlockStorageMovementAttemptedItems {
bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
Block[] blockArray = new Block[blocks.size()];
blocks.toArray(blockArray);
- bsmAttemptedItems.addReportedMovedBlocks(blockArray);
+ bsmAttemptedItems.notifyMovementTriedBlocks(blockArray);
assertEquals("Failed to receive result!", 1,
bsmAttemptedItems.getMovementFinishedBlocksCount());
}
@@ -137,7 +138,7 @@ public class TestBlockStorageMovementAttemptedItems {
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+ bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
// start block movement report monitor thread
bsmAttemptedItems.start();
@@ -162,7 +163,7 @@ public class TestBlockStorageMovementAttemptedItems {
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+ bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
@@ -190,7 +191,7 @@ public class TestBlockStorageMovementAttemptedItems {
.add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
Block[] blksMovementReport = new Block[1];
blksMovementReport[0] = new Block(item);
- bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+ bsmAttemptedItems.notifyMovementTriedBlocks(blksMovementReport);
assertFalse(
"Should not add in queue again if it is not there in"
+ " storageMovementAttemptedItems",
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org