You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/12/11 07:15:59 UTC
[11/50] [abbrv] hadoop git commit: HDFS-11151. [SPS]:
StoragePolicySatisfier should gracefully handle when there is no target node
with the required storage type. Contributed by Rakesh R
HDFS-11151. [SPS]: StoragePolicySatisfier should gracefully handle when there is no target node with the required storage type. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a9adc9d7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a9adc9d7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a9adc9d7
Branch: refs/heads/HDFS-10285
Commit: a9adc9d7fbdb562b720c1123694208cbe6f792a7
Parents: 4d2d053
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Sun Nov 27 11:15:26 2016 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Mon Dec 11 11:20:35 2017 +0530
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 2 +-
.../datanode/BlockStorageMovementTracker.java | 30 ++++--
.../datanode/StoragePolicySatisfyWorker.java | 20 +++-
.../BlockStorageMovementAttemptedItems.java | 4 +
.../server/namenode/StoragePolicySatisfier.java | 53 ++++++---
.../namenode/TestStoragePolicySatisfier.java | 108 ++++++++++++++++++-
6 files changed, 186 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9adc9d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index a5edbce..a0fe450 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4921,7 +4921,7 @@ public class BlockManager implements BlockStatsMXBean {
*/
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Added block collection id {} to block "
+ "storageMovementNeeded queue", id);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9adc9d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index d31f075..2de88fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -27,8 +27,9 @@ import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,15 +109,32 @@ public class BlockStorageMovementTracker implements Runnable {
}
}
+ /**
+ * Mark as block movement failure for the given trackId and blockId.
+ *
+ * @param trackId tracking id
+ * @param blockId block id
+ */
+ void markBlockMovementFailure(long trackId, long blockId) {
+ LOG.debug("Mark as block movement failure for the given "
+ + "trackId:{} and blockId:{}", trackId, blockId);
+ BlockMovementResult result = new BlockMovementResult(trackId, blockId, null,
+ BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE);
+ addMovementResultToTrackIdList(result);
+ }
+
private List<BlockMovementResult> addMovementResultToTrackIdList(
BlockMovementResult result) {
long trackId = result.getTrackId();
- List<BlockMovementResult> perTrackIdList = movementResults.get(trackId);
- if (perTrackIdList == null) {
- perTrackIdList = new ArrayList<>();
- movementResults.put(trackId, perTrackIdList);
+ List<BlockMovementResult> perTrackIdList;
+ synchronized (movementResults) {
+ perTrackIdList = movementResults.get(trackId);
+ if (perTrackIdList == null) {
+ perTrackIdList = new ArrayList<>();
+ movementResults.put(trackId, perTrackIdList);
+ }
+ perTrackIdList.add(result);
}
- perTrackIdList.add(result);
return perTrackIdList;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9adc9d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 1bd851e..a69a38b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -151,14 +151,24 @@ public class StoragePolicySatisfyWorker {
*/
public void processBlockMovingTasks(long trackID, String blockPoolID,
Collection<BlockMovingInfo> blockMovingInfos) {
+ LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- assert blkMovingInfo
- .getSources().length == blkMovingInfo.getTargets().length;
-
- for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
+ // Iterating backwards. This is to ensure that all the block src location
+ // which doesn't have a target node will be marked as failure before
+ // scheduling the block movement to valid target nodes.
+ for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) {
+ if (i >= blkMovingInfo.getTargets().length) {
+ // Since there is no target selected for scheduling the block,
+ // just mark this block storage movement as failure. Later, namenode
+ // can take action on this.
+ movementTracker.markBlockMovementFailure(trackID,
+ blkMovingInfo.getBlock().getBlockId());
+ continue;
+ }
+ DatanodeInfo target = blkMovingInfo.getTargets()[i];
BlockMovingTask blockMovingTask = new BlockMovingTask(
trackID, blockPoolID, blkMovingInfo.getBlock(),
- blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
+ blkMovingInfo.getSources()[i], target,
blkMovingInfo.getSourceStorageTypes()[i],
blkMovingInfo.getTargetStorageTypes()[i]);
Future<BlockMovementResult> moveCallable = moverCompletionService
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9adc9d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 580d0d6..5457dc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -218,4 +218,8 @@ public class BlockStorageMovementAttemptedItems {
return storageMovementAttemptedResults.size();
}
+ @VisibleForTesting
+ public int getAttemptedItemsCount() {
+ return storageMovementAttemptedItems.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9adc9d7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 4967a89..617ab2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -211,6 +211,14 @@ public class StoragePolicySatisfier implements Runnable {
}
}
+ addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos,
+ coordinatorNode);
+ }
+
+ private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
+ List<BlockMovingInfo> blockMovingInfos,
+ DatanodeDescriptor coordinatorNode) {
+
if (blockMovingInfos.size() < 1) {
// TODO: Major: handle this case. I think we need retry cases to
// be implemented. Idea is, if some files are not getting storage movement
@@ -218,6 +226,20 @@ public class StoragePolicySatisfier implements Runnable {
return;
}
+ boolean needBlockStorageMovement = false;
+ for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+ // Check for atleast one block storage movement has been chosen
+ if (blkMovingInfo.getTargets().length > 0){
+ needBlockStorageMovement = true;
+ break;
+ }
+ }
+ if (!needBlockStorageMovement) {
+ // Simply return as there is no targets selected for scheduling the block
+ // movement.
+ return;
+ }
+
// 'BlockCollectionId' is used as the tracking ID. All the blocks under this
// blockCollectionID will be added to this datanode.
coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos);
@@ -251,9 +273,8 @@ public class StoragePolicySatisfier implements Runnable {
List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
- StorageTypeNodePair chosenTarget =
- chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected,
- locsForExpectedStorageTypes, chosenNodes);
+ StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
+ existingTypeNodePair.dn, expected);
if (chosenTarget == null && blockManager.getDatanodeManager()
.getNetworkTopology().isNodeGroupAware()) {
@@ -282,15 +303,14 @@ public class StoragePolicySatisfier implements Runnable {
chosenNodes.add(chosenTarget.dn);
// TODO: We can increment scheduled block count for this node?
} else {
- // TODO: Failed to ChooseTargetNodes...So let just retry. Shall we
- // proceed without this targets? Then what should be final result?
- // How about pack empty target, means target node could not be chosen ,
- // so result should be RETRY_REQUIRED from DN always.
- // Log..unable to choose target node for source datanodeDescriptor
+ LOG.warn(
+ "Failed to choose target datanode for the required"
+ + " storage types {}, block:{}, existing storage type:{}",
+ expected, blockInfo, existingTypeNodePair.storageType);
sourceNodes.add(existingTypeNodePair.dn);
sourceStorageTypes.add(existingTypeNodePair.storageType);
- targetNodes.add(null);
- targetStorageTypes.add(null);
+ // Imp: Not setting the target details, empty targets. Later, this is
+ // used as an indicator for retrying this block movement.
}
}
BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
@@ -302,15 +322,13 @@ public class StoragePolicySatisfier implements Runnable {
}
/**
- * Choose the target storage within same Datanode if possible.
+ * Choose the target storage within same datanode if possible.
*
- * @param locsForExpectedStorageTypes
- * @param chosenNodes
+ * @param source source datanode
+ * @param targetTypes list of target storage types
*/
private StorageTypeNodePair chooseTargetTypeInSameNode(
- DatanodeDescriptor source, List<StorageType> targetTypes,
- StorageTypeNodeMap locsForExpectedStorageTypes,
- List<DatanodeDescriptor> chosenNodes) {
+ DatanodeDescriptor source, List<StorageType> targetTypes) {
for (StorageType t : targetTypes) {
DatanodeStorageInfo chooseStorage4Block =
source.chooseStorage4Block(t, 0);
@@ -328,6 +346,9 @@ public class StoragePolicySatisfier implements Runnable {
for (StorageType t : targetTypes) {
List<DatanodeDescriptor> nodesWithStorages =
locsForExpectedStorageTypes.getNodesWithStorages(t);
+ if (nodesWithStorages == null || nodesWithStorages.isEmpty()) {
+ continue; // no target nodes with the required storage type.
+ }
Collections.shuffle(nodesWithStorages);
for (DatanodeDescriptor target : nodesWithStorages) {
if (!chosenNodes.contains(target) && matcher.match(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9adc9d7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 901e1ba..499fe3c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -44,8 +46,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
-
/**
* Tests that StoragePolicySatisfier daemon is able to check the blocks to be
* moved and finding its suggested target locations to move.
@@ -79,7 +79,7 @@ public class TestStoragePolicySatisfier {
throws Exception {
try {
- // Change policy to ALL_SSD
+ // Change policy to COLD
dfs.setStoragePolicy(new Path(file), "COLD");
FSNamesystem namesystem = hdfsCluster.getNamesystem();
INode inode = namesystem.getFSDirectory().getINode(file);
@@ -356,6 +356,108 @@ public class TestStoragePolicySatisfier {
}
}
+ /**
+ * Tests to verify that for the given path, some of the blocks or block src
+ * locations(src nodes) under the given path will be scheduled for block
+ * movement.
+ *
+ * For example, there are two block for a file:
+ *
+ * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+ * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+ * Only one datanode is available with storage type ARCHIVE, say D.
+ *
+ * SPS will schedule block movement to the coordinator node with the details,
+ * blk_1[move A(DISK) -> D(ARCHIVE)], blk_2[move A(DISK) -> D(ARCHIVE)].
+ */
+ @Test(timeout = 300000)
+ public void testWhenOnlyFewTargetDatanodeAreAvailableToSatisfyStoragePolicy()
+ throws Exception {
+ try {
+ // Change policy to COLD
+ dfs.setStoragePolicy(new Path(file), "COLD");
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}};
+
+ // Adding ARCHIVE based datanodes.
+ startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+ // Wait till StorgePolicySatisfier identified that block to move to
+ // ARCHIVE area.
+ waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000);
+ waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+
+ waitForBlocksMovementResult(1, 30000);
+ } finally {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ /**
+ * Tests to verify that for the given path, no blocks or block src
+ * locations(src nodes) under the given path will be scheduled for block
+ * movement as there are no available datanode with required storage type.
+ *
+ * For example, there are two block for a file:
+ *
+ * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
+ * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
+ * No datanode is available with storage type ARCHIVE.
+ *
+ * SPS won't schedule any block movement for this path.
+ */
+ @Test(timeout = 300000)
+ public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
+ throws Exception {
+ try {
+ // Change policy to COLD
+ dfs.setStoragePolicy(new Path(file), "COLD");
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.DISK, StorageType.DISK}};
+ // Adding DISK based datanodes
+ startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+
+ // No block movement will be scheduled as there is no target node available
+ // with the required storage type.
+ waitForAttemptedItems(1, 30000);
+ waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
+ // Since there is no target node the item will get timed out and then
+ // re-attempted.
+ waitForAttemptedItems(1, 30000);
+ } finally {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ private void waitForAttemptedItems(long expectedBlkMovAttemptedCount,
+ int timeout) throws TimeoutException, InterruptedException {
+ BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+ final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
+ expectedBlkMovAttemptedCount,
+ sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
+ return sps.getAttemptedItemsMonitor()
+ .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
+ }
+ }, 100, timeout);
+ }
+
private void waitForBlocksMovementResult(long expectedBlkMovResultsCount,
int timeout) throws TimeoutException, InterruptedException {
BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org