You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2016/12/29 07:22:14 UTC
hadoop git commit: HDFS-11248: [SPS]: Handle partial block location
movements. Contributed by Rakesh R
Repository: hadoop
Updated Branches:
refs/heads/HDFS-10285 c5876ee04 -> 57193f703
HDFS-11248: [SPS]: Handle partial block location movements. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/57193f70
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/57193f70
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/57193f70
Branch: refs/heads/HDFS-10285
Commit: 57193f7032c4ed771383063052df287b031cf434
Parents: c5876ee
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Wed Dec 28 23:21:07 2016 -0800
Committer: Uma Maheswara Rao G <um...@intel.com>
Committed: Wed Dec 28 23:21:07 2016 -0800
----------------------------------------------------------------------
.../datanode/BlockStorageMovementTracker.java | 15 --
.../datanode/StoragePolicySatisfyWorker.java | 15 +-
.../BlockStorageMovementAttemptedItems.java | 206 +++++++++++++-----
.../server/namenode/StoragePolicySatisfier.java | 215 +++++++++++++------
.../TestBlockStorageMovementAttemptedItems.java | 101 ++++++++-
.../namenode/TestStoragePolicySatisfier.java | 63 +++++-
6 files changed, 454 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index 2de88fc..bd35b09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -28,7 +28,6 @@ import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,20 +108,6 @@ public class BlockStorageMovementTracker implements Runnable {
}
}
- /**
- * Mark as block movement failure for the given trackId and blockId.
- *
- * @param trackId tracking id
- * @param blockId block id
- */
- void markBlockMovementFailure(long trackId, long blockId) {
- LOG.debug("Mark as block movement failure for the given "
- + "trackId:{} and blockId:{}", trackId, blockId);
- BlockMovementResult result = new BlockMovementResult(trackId, blockId, null,
- BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE);
- addMovementResultToTrackIdList(result);
- }
-
private List<BlockMovementResult> addMovementResultToTrackIdList(
BlockMovementResult result) {
long trackId = result.getTrackId();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index ded010e..1648d8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -154,18 +154,9 @@ public class StoragePolicySatisfyWorker {
Collection<BlockMovingInfo> blockMovingInfos) {
LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- // Iterating backwards. This is to ensure that all the block src location
- // which doesn't have a target node will be marked as failure before
- // scheduling the block movement to valid target nodes.
- for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) {
- if (i >= blkMovingInfo.getTargets().length) {
- // Since there is no target selected for scheduling the block,
- // just mark this block storage movement as failure. Later, namenode
- // can take action on this.
- movementTracker.markBlockMovementFailure(trackID,
- blkMovingInfo.getBlock().getBlockId());
- continue;
- }
+ assert blkMovingInfo.getSources().length == blkMovingInfo
+ .getTargets().length;
+ for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
DatanodeInfo target = blkMovingInfo.getTargets()[i];
BlockMovingTask blockMovingTask = new BlockMovingTask(
trackID, blockPoolID, blkMovingInfo.getBlock(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index bb26082..ce97075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -43,11 +43,14 @@ import com.google.common.annotations.VisibleForTesting;
* automatically after timeout. The default timeout would be 30mins.
*/
public class BlockStorageMovementAttemptedItems {
- public static final Logger LOG =
+ private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
- // A map holds the items which are already taken for blocks movements
- // processing and sent to DNs.
- private final Map<Long, Long> storageMovementAttemptedItems;
+
+ /**
+ * A map holds the items which are already taken for blocks movements
+ * processing and sent to DNs.
+ */
+ private final Map<Long, ItemInfo> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
@@ -83,10 +86,16 @@ public class BlockStorageMovementAttemptedItems {
*
* @param blockCollectionID
* - tracking id / block collection id
+ * @param allBlockLocsAttemptedToSatisfy
+ * - failed to find matching target nodes to satisfy storage type for
+ * all the block locations of the given blockCollectionID
*/
- public void add(Long blockCollectionID) {
+ public void add(Long blockCollectionID,
+ boolean allBlockLocsAttemptedToSatisfy) {
synchronized (storageMovementAttemptedItems) {
- storageMovementAttemptedItems.put(blockCollectionID, monotonicNow());
+ ItemInfo itemInfo = new ItemInfo(monotonicNow(),
+ allBlockLocsAttemptedToSatisfy);
+ storageMovementAttemptedItems.put(blockCollectionID, itemInfo);
}
}
@@ -121,15 +130,62 @@ public class BlockStorageMovementAttemptedItems {
*/
public synchronized void stop() {
monitorRunning = false;
- timerThread.interrupt();
- try {
- timerThread.join(3000);
- } catch (InterruptedException ie) {
+ if (timerThread != null) {
+ timerThread.interrupt();
+ try {
+ timerThread.join(3000);
+ } catch (InterruptedException ie) {
+ }
}
this.clearQueues();
}
/**
+ * This class contains information of an attempted trackID. Information such
+ * as, (a)last attempted time stamp, (b)whether all the blocks in the trackID
+ * were attempted and blocks movement has been scheduled to satisfy storage
+ * policy. This is used by
+ * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
+ */
+ private final static class ItemInfo {
+ private final long lastAttemptedTimeStamp;
+ private final boolean allBlockLocsAttemptedToSatisfy;
+
+ /**
+ * ItemInfo constructor.
+ *
+ * @param lastAttemptedTimeStamp
+ * last attempted time stamp
+ * @param allBlockLocsAttemptedToSatisfy
+ * whether all the blocks in the trackID were attempted and blocks
+ * movement has been scheduled to satisfy storage policy
+ */
+ private ItemInfo(long lastAttemptedTimeStamp,
+ boolean allBlockLocsAttemptedToSatisfy) {
+ this.lastAttemptedTimeStamp = lastAttemptedTimeStamp;
+ this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
+ }
+
+ /**
+ * @return last attempted time stamp.
+ */
+ private long getLastAttemptedTimeStamp() {
+ return lastAttemptedTimeStamp;
+ }
+
+ /**
+ * @return true/false. True value represents that, all the block locations
+ * under the trackID has found matching target nodes to satisfy
+ * storage policy. False value represents that, trackID needed
+ * retries to satisfy the storage policy for some of the block
+ * locations.
+ */
+ private boolean isAllBlockLocsAttemptedToSatisfy() {
+ return allBlockLocsAttemptedToSatisfy;
+ }
+ }
+
+ /**
* A monitor class for checking block storage movement result and long waiting
* items periodically.
*/
@@ -147,76 +203,108 @@ public class BlockStorageMovementAttemptedItems {
}
}
}
+ }
- private void blocksStorageMovementUnReportedItemsCheck() {
- synchronized (storageMovementAttemptedItems) {
- Iterator<Entry<Long, Long>> iter =
- storageMovementAttemptedItems.entrySet().iterator();
- long now = monotonicNow();
- while (iter.hasNext()) {
- Entry<Long, Long> entry = iter.next();
- if (now > entry.getValue() + selfRetryTimeout) {
- Long blockCollectionID = entry.getKey();
- synchronized (storageMovementAttemptedResults) {
- boolean exist = isExistInResult(blockCollectionID);
- if (!exist) {
- blockStorageMovementNeeded.add(blockCollectionID);
- } else {
- LOG.info("Blocks storage movement results for the"
- + " tracking id : " + blockCollectionID
- + " is reported from one of the co-ordinating datanode."
- + " So, the result will be processed soon.");
- }
+ @VisibleForTesting
+ void blocksStorageMovementUnReportedItemsCheck() {
+ synchronized (storageMovementAttemptedItems) {
+ Iterator<Entry<Long, ItemInfo>> iter = storageMovementAttemptedItems
+ .entrySet().iterator();
+ long now = monotonicNow();
+ while (iter.hasNext()) {
+ Entry<Long, ItemInfo> entry = iter.next();
+ ItemInfo itemInfo = entry.getValue();
+ if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) {
+ Long blockCollectionID = entry.getKey();
+ synchronized (storageMovementAttemptedResults) {
+ if (!isExistInResult(blockCollectionID)) {
+ blockStorageMovementNeeded.add(blockCollectionID);
iter.remove();
+ LOG.info("TrackID: {} becomes timed out and moved to needed "
+ + "retries queue for next iteration.", blockCollectionID);
+ } else {
+ LOG.info("Blocks storage movement results for the"
+ + " tracking id : " + blockCollectionID
+ + " is reported from one of the co-ordinating datanode."
+ + " So, the result will be processed soon.");
}
}
}
-
}
+
}
+ }
- private boolean isExistInResult(Long blockCollectionID) {
- Iterator<BlocksStorageMovementResult> iter =
- storageMovementAttemptedResults.iterator();
- while (iter.hasNext()) {
- BlocksStorageMovementResult storageMovementAttemptedResult =
- iter.next();
- if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
- return true;
- }
+ private boolean isExistInResult(Long blockCollectionID) {
+ Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
+ .iterator();
+ while (iter.hasNext()) {
+ BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
+ if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
+ return true;
}
- return false;
}
+ return false;
+ }
- private void blockStorageMovementResultCheck() {
- synchronized (storageMovementAttemptedResults) {
- Iterator<BlocksStorageMovementResult> iter =
- storageMovementAttemptedResults.iterator();
- while (iter.hasNext()) {
- BlocksStorageMovementResult storageMovementAttemptedResult =
- iter.next();
+ @VisibleForTesting
+ void blockStorageMovementResultCheck() {
+ synchronized (storageMovementAttemptedResults) {
+ Iterator<BlocksStorageMovementResult> resultsIter =
+ storageMovementAttemptedResults.iterator();
+ while (resultsIter.hasNext()) {
+ // TrackID need to be retried in the following cases:
+ // 1) All or few scheduled block(s) movement has been failed.
+ // 2) All the scheduled block(s) movement has been succeeded but there
+ // are unscheduled block(s) movement in this trackID. Say, some of
+ // the blocks in the trackID couldn't finding any matching target node
+ // for scheduling block movement in previous SPS iteration.
+ BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
+ .next();
+ synchronized (storageMovementAttemptedItems) {
if (storageMovementAttemptedResult
.getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
blockStorageMovementNeeded
.add(storageMovementAttemptedResult.getTrackId());
- LOG.warn("Blocks storage movement results for the tracking id : "
- + storageMovementAttemptedResult.getTrackId()
+ LOG.warn("Blocks storage movement results for the tracking id: {}"
+ " is reported from co-ordinating datanode, but result"
- + " status is FAILURE. So, added for retry");
+ + " status is FAILURE. So, added for retry",
+ storageMovementAttemptedResult.getTrackId());
} else {
- synchronized (storageMovementAttemptedItems) {
- storageMovementAttemptedItems
- .remove(storageMovementAttemptedResult.getTrackId());
+ ItemInfo itemInfo = storageMovementAttemptedItems
+ .get(storageMovementAttemptedResult.getTrackId());
+
+ // ItemInfo could be null. One case is, before the blocks movements
+ // result arrives the attempted trackID became timed out and then
+ // removed the trackID from the storageMovementAttemptedItems list.
+ // TODO: Need to ensure that trackID is added to the
+ // 'blockStorageMovementNeeded' queue for retries to handle the
+ // following condition. If all the block locations under the trackID
+ // are attempted and failed to find matching target nodes to satisfy
+ // storage policy in previous SPS iteration.
+ if (itemInfo != null
+ && !itemInfo.isAllBlockLocsAttemptedToSatisfy()) {
+ blockStorageMovementNeeded
+ .add(storageMovementAttemptedResult.getTrackId());
+ LOG.warn("Blocks storage movement is SUCCESS for the track id: {}"
+ + " reported from co-ordinating datanode. But adding trackID"
+ + " back to retry queue as some of the blocks couldn't find"
+ + " matching target nodes in previous SPS iteration.",
+ storageMovementAttemptedResult.getTrackId());
+ } else {
+ LOG.info("Blocks storage movement is SUCCESS for the track id: {}"
+ + " reported from co-ordinating datanode. But the trackID "
+ + "doesn't exists in storageMovementAttemptedItems list",
+ storageMovementAttemptedResult.getTrackId());
}
- LOG.info("Blocks storage movement results for the tracking id : "
- + storageMovementAttemptedResult.getTrackId()
- + " is reported from co-ordinating datanode. "
- + "The result status is SUCCESS.");
}
- iter.remove(); // remove from results as processed above
+ // Remove trackID from the attempted list, if any.
+ storageMovementAttemptedItems
+ .remove(storageMovementAttemptedResult.getTrackId());
}
+ // Remove trackID from results as processed above.
+ resultsIter.remove();
}
-
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 56a531f..26e0775 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -162,8 +162,15 @@ public class StoragePolicySatisfier implements Runnable {
try {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
- computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
- this.storageMovementsMonitor.add(blockCollectionID);
+ BlockCollection blockCollection =
+ namesystem.getBlockCollection(blockCollectionID);
+ // Check blockCollectionId existence.
+ if (blockCollection != null) {
+ boolean allBlockLocsAttemptedToSatisfy =
+ computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
+ this.storageMovementsMonitor.add(blockCollectionID,
+ allBlockLocsAttemptedToSatisfy);
+ }
}
// TODO: We can think to make this as configurable later, how frequently
// we want to check block movements.
@@ -192,20 +199,17 @@ public class StoragePolicySatisfier implements Runnable {
}
}
- private void computeAndAssignStorageMismatchedBlocksToDNs(
- long blockCollectionID) {
- BlockCollection blockCollection =
- namesystem.getBlockCollection(blockCollectionID);
- if (blockCollection == null) {
- return;
- }
+ private boolean computeAndAssignStorageMismatchedBlocksToDNs(
+ BlockCollection blockCollection) {
byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
BlockStoragePolicy existingStoragePolicy =
blockManager.getStoragePolicy(existingStoragePolicyID);
if (!blockCollection.getLastBlock().isComplete()) {
// Postpone, currently file is under construction
// So, should we add back? or leave it to user
- return;
+ LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
+ + " this to the next retry iteration", blockCollection.getId());
+ return true;
}
// First datanode will be chosen as the co-ordinator node for storage
@@ -213,61 +217,87 @@ public class StoragePolicySatisfier implements Runnable {
DatanodeDescriptor coordinatorNode = null;
BlockInfo[] blocks = blockCollection.getBlocks();
List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
+
+ // True value represents that, SPS is able to find matching target nodes
+ // to satisfy storage type for all the blocks locations of the given
+ // blockCollection. A false value represents that, blockCollection needed
+ // retries to satisfy the storage policy for some of the block locations.
+ boolean foundMatchingTargetNodesForAllBlocks = true;
+
for (int i = 0; i < blocks.length; i++) {
BlockInfo blockInfo = blocks[i];
- List<StorageType> expectedStorageTypes =
- existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication());
- DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
- StorageType[] storageTypes = new StorageType[storages.length];
- for (int j = 0; j < storages.length; j++) {
- DatanodeStorageInfo datanodeStorageInfo = storages[j];
- StorageType storageType = datanodeStorageInfo.getStorageType();
- storageTypes[j] = storageType;
- }
- List<StorageType> existing =
- new LinkedList<StorageType>(Arrays.asList(storageTypes));
- if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
- existing, true)) {
- List<StorageTypeNodePair> sourceWithStorageMap =
- new ArrayList<StorageTypeNodePair>();
- List<DatanodeStorageInfo> existingBlockStorages =
- new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
- for (StorageType existingType : existing) {
- Iterator<DatanodeStorageInfo> iterator =
- existingBlockStorages.iterator();
- while (iterator.hasNext()) {
- DatanodeStorageInfo datanodeStorageInfo = iterator.next();
- StorageType storageType = datanodeStorageInfo.getStorageType();
- if (storageType == existingType) {
- iterator.remove();
- sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
- datanodeStorageInfo.getDatanodeDescriptor()));
- break;
- }
- }
- }
+ List<StorageType> expectedStorageTypes = existingStoragePolicy
+ .chooseStorageTypes(blockInfo.getReplication());
+ foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos(
+ blockMovingInfos, blockInfo, expectedStorageTypes);
+ }
- StorageTypeNodeMap locsForExpectedStorageTypes =
- findTargetsForExpectedStorageTypes(expectedStorageTypes);
-
- BlockMovingInfo blockMovingInfo =
- findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap,
- expectedStorageTypes, locsForExpectedStorageTypes);
- if (coordinatorNode == null) {
- // For now, first datanode will be chosen as the co-ordinator. Later
- // this can be optimized if needed.
- coordinatorNode =
- (DatanodeDescriptor) blockMovingInfo.getSources()[0];
+ assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
+ blockMovingInfos, coordinatorNode);
+ return foundMatchingTargetNodesForAllBlocks;
+ }
+
+ /**
+ * Compute the list of block moving information corresponding to the given
+ * blockId. This will check that each block location of the given block is
+ * satisfying the expected storage policy. If block location is not satisfied
+ * the policy then find out the target node with the expected storage type to
+ * satisfy the storage policy.
+ *
+ * @param blockMovingInfos
+ * - list of block source and target node pair
+ * @param blockInfo
+ * - block details
+ * @param expectedStorageTypes
+ * - list of expected storage type to satisfy the storage policy
+ * @return false if some of the block locations failed to find target node to
+ * satisfy the storage policy, true otherwise
+ */
+ private boolean computeBlockMovingInfos(
+ List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
+ List<StorageType> expectedStorageTypes) {
+ boolean foundMatchingTargetNodesForBlock = true;
+ DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
+ StorageType[] storageTypes = new StorageType[storages.length];
+ for (int j = 0; j < storages.length; j++) {
+ DatanodeStorageInfo datanodeStorageInfo = storages[j];
+ StorageType storageType = datanodeStorageInfo.getStorageType();
+ storageTypes[j] = storageType;
+ }
+ List<StorageType> existing =
+ new LinkedList<StorageType>(Arrays.asList(storageTypes));
+ if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+ existing, true)) {
+ List<StorageTypeNodePair> sourceWithStorageMap =
+ new ArrayList<StorageTypeNodePair>();
+ List<DatanodeStorageInfo> existingBlockStorages =
+ new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+ for (StorageType existingType : existing) {
+ Iterator<DatanodeStorageInfo> iterator =
+ existingBlockStorages.iterator();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+ StorageType storageType = datanodeStorageInfo.getStorageType();
+ if (storageType == existingType) {
+ iterator.remove();
+ sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
+ datanodeStorageInfo.getDatanodeDescriptor()));
+ break;
+ }
}
- blockMovingInfos.add(blockMovingInfo);
}
- }
- addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos,
- coordinatorNode);
+ StorageTypeNodeMap locsForExpectedStorageTypes =
+ findTargetsForExpectedStorageTypes(expectedStorageTypes);
+
+ foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove(
+ blockMovingInfos, blockInfo, existing, sourceWithStorageMap,
+ expectedStorageTypes, locsForExpectedStorageTypes);
+ }
+ return foundMatchingTargetNodesForBlock;
}
- private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID,
+ private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID,
List<BlockMovingInfo> blockMovingInfos,
DatanodeDescriptor coordinatorNode) {
@@ -278,6 +308,11 @@ public class StoragePolicySatisfier implements Runnable {
return;
}
+ // For now, first datanode will be chosen as the co-ordinator. Later
+ // this can be optimized if needed.
+ coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0)
+ .getSources()[0];
+
boolean needBlockStorageMovement = false;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for atleast one block storage movement has been chosen
@@ -301,6 +336,8 @@ public class StoragePolicySatisfier implements Runnable {
* Find the good target node for each source node for which block storages was
* misplaced.
*
+ * @param blockMovingInfos
+ * - list of block source and target node pair
* @param blockInfo
* - Block
* @param existing
@@ -311,23 +348,49 @@ public class StoragePolicySatisfier implements Runnable {
* - Expecting storages to move
* @param locsForExpectedStorageTypes
* - Available DNs for expected storage types
- * @return list of block source and target node pair
+ * @return false if some of the block locations failed to find target node to
+ * satisfy the storage policy
*/
- private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo,
+ private boolean findSourceAndTargetToMove(
+ List<BlockMovingInfo> blockMovingInfos, BlockInfo blockInfo,
List<StorageType> existing,
List<StorageTypeNodePair> sourceWithStorageList,
List<StorageType> expected,
StorageTypeNodeMap locsForExpectedStorageTypes) {
+ boolean foundMatchingTargetNodesForBlock = true;
List<DatanodeInfo> sourceNodes = new ArrayList<>();
List<StorageType> sourceStorageTypes = new ArrayList<>();
List<DatanodeInfo> targetNodes = new ArrayList<>();
List<StorageType> targetStorageTypes = new ArrayList<>();
List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
+
+ // Looping over all the source node locations and choose the target
+ // storage within same node if possible. This is done separately to
+ // avoid choosing a target which already has this block.
for (int i = 0; i < sourceWithStorageList.size(); i++) {
StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
existingTypeNodePair.dn, expected);
+ if (chosenTarget != null) {
+ sourceNodes.add(existingTypeNodePair.dn);
+ sourceStorageTypes.add(existingTypeNodePair.storageType);
+ targetNodes.add(chosenTarget.dn);
+ targetStorageTypes.add(chosenTarget.storageType);
+ chosenNodes.add(chosenTarget.dn);
+ // TODO: We can increment scheduled block count for this node?
+ }
+ }
+ // Looping over all the source node locations. Choose a remote target
+ // storage node if it was not found out within same node.
+ for (int i = 0; i < sourceWithStorageList.size(); i++) {
+ StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
+ StorageTypeNodePair chosenTarget = null;
+ // Chosen the target storage within same datanode. So just skipping this
+ // source node.
+ if (sourceNodes.contains(existingTypeNodePair.dn)) {
+ continue;
+ }
if (chosenTarget == null && blockManager.getDatanodeManager()
.getNetworkTopology().isNodeGroupAware()) {
chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
@@ -359,18 +422,40 @@ public class StoragePolicySatisfier implements Runnable {
"Failed to choose target datanode for the required"
+ " storage types {}, block:{}, existing storage type:{}",
expected, blockInfo, existingTypeNodePair.storageType);
- sourceNodes.add(existingTypeNodePair.dn);
- sourceStorageTypes.add(existingTypeNodePair.storageType);
- // Imp: Not setting the target details, empty targets. Later, this is
- // used as an indicator for retrying this block movement.
+ foundMatchingTargetNodesForBlock = false;
}
}
- BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
+
+ blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
+ sourceStorageTypes, targetNodes, targetStorageTypes));
+ return foundMatchingTargetNodesForBlock;
+ }
+
+ private List<BlockMovingInfo> getBlockMovingInfos(BlockInfo blockInfo,
+ List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
+ List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes) {
+ List<BlockMovingInfo> blkMovingInfos = new ArrayList<>();
+ // No source-target node pair exists.
+ if (sourceNodes.size() <= 0) {
+ return blkMovingInfos;
+ }
+ buildBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
+ targetNodes, targetStorageTypes, blkMovingInfos);
+ return blkMovingInfos;
+ }
+
+ private void buildBlockMovingInfos(BlockInfo blockInfo,
+ List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
+ List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+ List<BlockMovingInfo> blkMovingInfos) {
+ Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
+ blockInfo.getGenerationStamp());
+ BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk,
sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
- return blkMovingInfo;
+ blkMovingInfos.add(blkMovingInfo);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 8c70d99..6641134 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -33,13 +33,13 @@ public class TestBlockStorageMovementAttemptedItems {
private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+ private final int selfRetryTimeout = 500;
@Before
public void setup() {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
- bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500,
- unsatisfiedStorageMovementFiles);
- bsmAttemptedItems.start();
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
+ selfRetryTimeout, unsatisfiedStorageMovementFiles);
}
@After
@@ -72,8 +72,9 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testAddResultWithFailureResult() throws Exception {
+ bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- bsmAttemptedItems.add(item);
+ bsmAttemptedItems.add(item, true);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
@@ -82,8 +83,9 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testAddResultWithSucessResult() throws Exception {
+ bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- bsmAttemptedItems.add(item);
+ bsmAttemptedItems.add(item, true);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
@@ -92,10 +94,93 @@ public class TestBlockStorageMovementAttemptedItems {
@Test(timeout = 30000)
public void testNoResultAdded() throws Exception {
+ bsmAttemptedItems.start(); // start block movement result monitor thread
Long item = new Long(1234);
- bsmAttemptedItems.add(item);
- // After selfretry timeout, it should be added back for retry
- assertTrue(checkItemMovedForRetry(item, 600));
+ bsmAttemptedItems.add(item, true);
+ // After self retry timeout, it should be added back for retry
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(item, 600));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
}
+ /**
+ * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
+ * first occurrence is #blockStorageMovementResultCheck() and then
+ * #blocksStorageMovementUnReportedItemsCheck().
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementShouldBeRetried1() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.add(item, false);
+ bsmAttemptedItems.addResults(
+ new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+ item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+
+ // start block movement result monitor thread
+ bsmAttemptedItems.start();
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(item, 5000));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+
+ /**
+ * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here,
+ * first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then
+ * #blockStorageMovementResultCheck().
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementShouldBeRetried2() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.add(item, false);
+ bsmAttemptedItems.addResults(
+ new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+ item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
+
+ Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
+
+ bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
+ bsmAttemptedItems.blockStorageMovementResultCheck();
+
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(item, 5000));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+
+ /**
+ * Partial block movement with only BlocksStorageMovementResult#FAILURE result
+ * and storageMovementAttemptedItems list is empty.
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementShouldBeRetried3() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.addResults(
+ new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+ item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+ bsmAttemptedItems.blockStorageMovementResultCheck();
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(item, 5000));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+
+ /**
+ * Partial block movement with BlocksStorageMovementResult#FAILURE result and
+ * storageMovementAttemptedItems.
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementShouldBeRetried4() throws Exception {
+ Long item = new Long(1234);
+ bsmAttemptedItems.add(item, false);
+ bsmAttemptedItems.addResults(
+ new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
+ item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
+ bsmAttemptedItems.blockStorageMovementResultCheck();
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(item, 5000));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/57193f70/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 179b66b..718dbcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -527,6 +527,59 @@ public class TestStoragePolicySatisfier {
waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
}
+ /**
+ * Tests to verify that for the given path, only few of the blocks or block
+ * src locations(src nodes) under the given path will be scheduled for block
+ * movement.
+ *
+ * For example, there are two block for a file:
+ *
+ * File1 => two blocks and default storage policy(HOT).
+ * blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)],
+ * blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)].
+ *
+ * Now, set storage policy to COLD.
+ * Only two Dns are available with expected storage type ARCHIVE, say A, E.
+ *
+ * SPS will schedule block movement to the coordinator node with the details,
+ * blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)],
+ * blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)].
+ */
+ @Test(timeout = 300000)
+ public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes()
+ throws Exception {
+ try {
+ int numOfDns = 5;
+ config.setLong("dfs.block.size", 1024);
+ allDiskTypes =
+ new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.ARCHIVE}};
+ hdfsCluster = startCluster(config, allDiskTypes, numOfDns,
+ storagesPerDatanode, capacity);
+ dfs = hdfsCluster.getFileSystem();
+ writeContent(file, (short) 5);
+
+ // Change policy to COLD
+ dfs.setStoragePolicy(new Path(file), "COLD");
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+ // Wait till StorgePolicySatisfier identified that block to move to
+ // ARCHIVE area.
+ waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
+ waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
+
+ waitForBlocksMovementResult(1, 30000);
+ } finally {
+ shutdownCluster();
+ }
+ }
+
private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
throws IOException {
ArrayList<DataNode> dns = hdfsCluster.getDataNodes();
@@ -561,7 +614,7 @@ public class TestStoragePolicySatisfier {
DataNodeTestUtils.mockDatanodeBlkPinning(dn, true);
favoredNodesCount--;
if (favoredNodesCount <= 0) {
- break;// marked favoredNodesCount number of pinned block location
+ break; // marked favoredNodesCount number of pinned block location
}
}
return file1;
@@ -600,8 +653,14 @@ public class TestStoragePolicySatisfier {
}
private void writeContent(final String fileName) throws IOException {
+ writeContent(fileName, (short) 3);
+ }
+
+ private void writeContent(final String fileName, short replicatonFactor)
+ throws IOException {
// write to DISK
- final FSDataOutputStream out = dfs.create(new Path(fileName));
+ final FSDataOutputStream out = dfs.create(new Path(fileName),
+ replicatonFactor);
for (int i = 0; i < 1000; i++) {
out.writeChars("t");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org