You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2018/02/16 14:40:17 UTC
[26/50] [abbrv] hadoop git commit: HDFS-12570: [SPS]: Refactor
Co-ordinator datanode logic to track the block storage movements. Contributed
by Rakesh R.
HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1abca8dd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1abca8dd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1abca8dd
Branch: refs/heads/HDFS-10285
Commit: 1abca8dd495adb009adac15bcf6dd07fd6c45437
Parents: f6adbb3
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Thu Oct 12 17:17:51 2017 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Fri Feb 16 19:47:30 2018 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +-
.../DatanodeProtocolClientSideTranslatorPB.java | 12 +-
.../DatanodeProtocolServerSideTranslatorPB.java | 4 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 150 +++-----
.../blockmanagement/DatanodeDescriptor.java | 50 ++-
.../server/blockmanagement/DatanodeManager.java | 104 ++++--
.../hdfs/server/datanode/BPOfferService.java | 3 +-
.../hdfs/server/datanode/BPServiceActor.java | 33 +-
.../datanode/BlockStorageMovementTracker.java | 80 ++---
.../datanode/StoragePolicySatisfyWorker.java | 214 ++++--------
.../BlockStorageMovementAttemptedItems.java | 299 ++++------------
.../BlockStorageMovementInfosBatch.java | 61 ----
.../hdfs/server/namenode/FSNamesystem.java | 11 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 7 +-
.../server/namenode/StoragePolicySatisfier.java | 343 ++++++++++---------
.../protocol/BlockStorageMovementCommand.java | 99 ++----
.../BlocksStorageMoveAttemptFinished.java | 48 +++
.../protocol/BlocksStorageMovementResult.java | 74 ----
.../hdfs/server/protocol/DatanodeProtocol.java | 5 +-
.../src/main/proto/DatanodeProtocol.proto | 30 +-
.../src/main/resources/hdfs-default.xml | 21 +-
.../src/site/markdown/ArchivalStorage.md | 6 +-
.../TestNameNodePrunesMissingStorages.java | 5 +-
.../datanode/InternalDataNodeTestUtils.java | 4 +-
.../server/datanode/TestBPOfferService.java | 4 +-
.../hdfs/server/datanode/TestBlockRecovery.java | 4 +-
.../server/datanode/TestDataNodeLifeline.java | 6 +-
.../TestDatanodeProtocolRetryPolicy.java | 4 +-
.../server/datanode/TestFsDatasetCache.java | 4 +-
.../TestStoragePolicySatisfyWorker.java | 52 ++-
.../hdfs/server/datanode/TestStorageReport.java | 4 +-
.../server/namenode/NNThroughputBenchmark.java | 6 +-
.../hdfs/server/namenode/NameNodeAdapter.java | 4 +-
.../TestBlockStorageMovementAttemptedItems.java | 145 ++++----
.../hdfs/server/namenode/TestDeadDatanode.java | 4 +-
.../namenode/TestStoragePolicySatisfier.java | 115 ++++++-
...stStoragePolicySatisfierWithStripedFile.java | 20 +-
37 files changed, 908 insertions(+), 1135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0563ad9..5e96f04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -618,11 +618,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.recheck.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =
- 5 * 60 * 1000;
+ 1 * 60 * 1000;
public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY =
"dfs.storage.policy.satisfier.self.retry.timeout.millis";
public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT =
- 20 * 60 * 1000;
+ 5 * 60 * 1000;
+ public static final String DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY =
+ "dfs.storage.policy.satisfier.low.max-streams.preference";
+ public static final boolean DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT =
+ false;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 9dd87d0..dcc0705 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -140,7 +140,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
- BlocksStorageMovementResult[] blksMovementResults) throws IOException {
+ BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+ throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -165,8 +166,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
// Adding blocks movement results to the heart beat request.
- builder.addAllBlksMovementResults(
- PBHelper.convertBlksMovResults(blksMovementResults));
+ if (storageMovementFinishedBlks != null
+ && storageMovementFinishedBlks.getBlocks() != null) {
+ builder.setStorageMoveAttemptFinishedBlks(
+ PBHelper.convertBlksMovReport(storageMovementFinishedBlks));
+ }
HeartbeatResponseProto resp;
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 40458ef..b5bb80a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -123,8 +123,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
- PBHelper.convertBlksMovResults(
- request.getBlksMovementResultsList()));
+ PBHelper.convertBlksMovReport(
+ request.getStorageMoveAttemptFinishedBlks()));
} catch (IOException e) {
throw new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 996b986..38f72c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -56,11 +57,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
@@ -104,8 +105,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -971,59 +971,27 @@ public class PBHelper {
return SlowDiskReports.create(slowDisksMap);
}
- public static BlocksStorageMovementResult[] convertBlksMovResults(
- List<BlocksStorageMovementResultProto> protos) {
- BlocksStorageMovementResult[] results =
- new BlocksStorageMovementResult[protos.size()];
- for (int i = 0; i < protos.size(); i++) {
- BlocksStorageMovementResultProto resultProto = protos.get(i);
- BlocksStorageMovementResult.Status status;
- switch (resultProto.getStatus()) {
- case SUCCESS:
- status = Status.SUCCESS;
- break;
- case FAILURE:
- status = Status.FAILURE;
- break;
- case IN_PROGRESS:
- status = Status.IN_PROGRESS;
- break;
- default:
- throw new AssertionError("Unknown status: " + resultProto.getStatus());
- }
- results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
- status);
+ public static BlocksStorageMoveAttemptFinished convertBlksMovReport(
+ BlocksStorageMoveAttemptFinishedProto proto) {
+
+ List<BlockProto> blocksList = proto.getBlocksList();
+ Block[] blocks = new Block[blocksList.size()];
+ for (int i = 0; i < blocksList.size(); i++) {
+ BlockProto blkProto = blocksList.get(i);
+ blocks[i] = PBHelperClient.convert(blkProto);
}
- return results;
+ return new BlocksStorageMoveAttemptFinished(blocks);
}
- public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
- BlocksStorageMovementResult[] blocksMovementResults) {
- List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
- new ArrayList<>();
- BlocksStorageMovementResultProto.Builder builder =
- BlocksStorageMovementResultProto.newBuilder();
- for (int i = 0; i < blocksMovementResults.length; i++) {
- BlocksStorageMovementResult report = blocksMovementResults[i];
- builder.setTrackID(report.getTrackId());
- BlocksStorageMovementResultProto.Status status;
- switch (report.getStatus()) {
- case SUCCESS:
- status = BlocksStorageMovementResultProto.Status.SUCCESS;
- break;
- case FAILURE:
- status = BlocksStorageMovementResultProto.Status.FAILURE;
- break;
- case IN_PROGRESS:
- status = BlocksStorageMovementResultProto.Status.IN_PROGRESS;
- break;
- default:
- throw new AssertionError("Unknown status: " + report.getStatus());
- }
- builder.setStatus(status);
- blocksMovementResultsProto.add(builder.build());
+ public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport(
+ BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) {
+ BlocksStorageMoveAttemptFinishedProto.Builder builder =
+ BlocksStorageMoveAttemptFinishedProto.newBuilder();
+ Block[] blocks = blocksMoveAttemptFinished.getBlocks();
+ for (Block block : blocks) {
+ builder.addBlocks(PBHelperClient.convert(block));
}
- return blocksMovementResultsProto;
+ return builder.build();
}
public static JournalInfo convert(JournalInfoProto info) {
@@ -1211,34 +1179,34 @@ public class PBHelper {
BlockStorageMovementCommandProto.Builder builder =
BlockStorageMovementCommandProto.newBuilder();
- builder.setTrackID(blkStorageMovementCmd.getTrackID());
builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId());
Collection<BlockMovingInfo> blockMovingInfos = blkStorageMovementCmd
.getBlockMovingTasks();
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- builder.addBlockStorageMovement(
- convertBlockMovingInfo(blkMovingInfo));
+ builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo));
}
return builder.build();
}
- private static BlockStorageMovementProto convertBlockMovingInfo(
+ private static BlockMovingInfoProto convertBlockMovingInfo(
BlockMovingInfo blkMovingInfo) {
- BlockStorageMovementProto.Builder builder = BlockStorageMovementProto
+ BlockMovingInfoProto.Builder builder = BlockMovingInfoProto
.newBuilder();
builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock()));
- DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources();
- builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+ DatanodeInfo sourceDnInfo = blkMovingInfo.getSource();
+ builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo));
- DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets();
- builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+ DatanodeInfo targetDnInfo = blkMovingInfo.getTarget();
+ builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo));
- StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes();
- builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes));
+ StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
+ builder.setSourceStorageType(
+ PBHelperClient.convertStorageType(sourceStorageType));
- StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes();
- builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+ StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
+ builder.setTargetStorageType(
+ PBHelperClient.convertStorageType(targetStorageType));
return builder.build();
}
@@ -1246,42 +1214,38 @@ public class PBHelper {
private static DatanodeCommand convert(
BlockStorageMovementCommandProto blkStorageMovementCmdProto) {
Collection<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- List<BlockStorageMovementProto> blkSPSatisfyList =
- blkStorageMovementCmdProto.getBlockStorageMovementList();
- for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) {
+ List<BlockMovingInfoProto> blkSPSatisfyList =
+ blkStorageMovementCmdProto.getBlockMovingInfoList();
+ for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) {
blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy));
}
return new BlockStorageMovementCommand(
DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
- blkStorageMovementCmdProto.getTrackID(),
blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos);
}
private static BlockMovingInfo convertBlockMovingInfo(
- BlockStorageMovementProto blockStoragePolicySatisfyProto) {
- BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock();
+ BlockMovingInfoProto blockStorageMovingInfoProto) {
+ BlockProto blockProto = blockStorageMovingInfoProto.getBlock();
Block block = PBHelperClient.convert(blockProto);
- DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto
- .getSourceDnInfos();
- DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto);
-
- DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto
- .getTargetDnInfos();
- DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto);
-
- StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto
- .getSourceStorageTypes();
- StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes(
- srcStorageTypesProto.getStorageTypesList(),
- srcStorageTypesProto.getStorageTypesList().size());
-
- StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto
- .getTargetStorageTypes();
- StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes(
- targetStorageTypesProto.getStorageTypesList(),
- targetStorageTypesProto.getStorageTypesList().size());
- return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos,
- srcStorageTypes, targetStorageTypes);
+ DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto
+ .getSourceDnInfo();
+ DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto);
+
+ DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto
+ .getTargetDnInfo();
+ DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto);
+ StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto
+ .getSourceStorageType();
+ StorageType srcStorageType = PBHelperClient
+ .convertStorageType(srcStorageTypeProto);
+
+ StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto
+ .getTargetStorageType();
+ StorageType targetStorageType = PBHelperClient
+ .convertStorageType(targetStorageTypeProto);
+ return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo,
+ srcStorageType, targetStorageType);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 0311b02..f9a76b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -212,7 +211,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
* A queue of blocks corresponding to trackID for moving its storage
* placements by this datanode.
*/
- private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
+ private final Queue<BlockMovingInfo> storageMovementBlocks =
new LinkedList<>();
private volatile boolean dropSPSWork = false;
@@ -1079,30 +1078,45 @@ public class DatanodeDescriptor extends DatanodeInfo {
/**
* Add the block infos which needs to move its storage locations.
*
- * @param trackID
- * - unique identifier which will be used for tracking the given set
- * of blocks movement completion.
- * @param storageMismatchedBlocks
- * - storage mismatched block infos
+ * @param blkMovingInfo
+ * - storage mismatched block info
*/
- public void addBlocksToMoveStorage(long trackID,
- List<BlockMovingInfo> storageMismatchedBlocks) {
+ public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) {
synchronized (storageMovementBlocks) {
- storageMovementBlocks.offer(
- new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks));
+ storageMovementBlocks.offer(blkMovingInfo);
}
}
/**
- * @return block infos which needs to move its storage locations. This returns
- * list of blocks under one trackId.
+ * Return the number of blocks queued up for movement.
*/
- public BlockStorageMovementInfosBatch getBlocksToMoveStorages() {
+ public int getNumberOfBlocksToMoveStorages() {
+ return storageMovementBlocks.size();
+ }
+
+ /**
+ * Get the blocks to move to satisfy the storage media type.
+ *
+ * @param numBlocksToMoveTasks
+ * total number of blocks which will be send to this datanode for
+ * block movement.
+ *
+ * @return block infos which needs to move its storage locations.
+ */
+ public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) {
synchronized (storageMovementBlocks) {
- // TODO: Presently returning the list of blocks under one trackId.
- // Need to limit the list of items into small batches with in trackId
- // itself if blocks are many(For example: a file contains many blocks).
- return storageMovementBlocks.poll();
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ for (; !storageMovementBlocks.isEmpty()
+ && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) {
+ blockMovingInfos.add(storageMovementBlocks.poll());
+ }
+ BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos
+ .size()];
+ blkMoveArray = blockMovingInfos.toArray(blkMoveArray);
+ if (blkMoveArray.length > 0) {
+ return blkMoveArray;
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 6187b37..ae5fe22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -208,6 +208,8 @@ public class DatanodeManager {
*/
private final long timeBetweenResendingCachingDirectivesMs;
+ private final boolean blocksToMoveShareEqualRatio;
+
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -332,6 +334,12 @@ public class DatanodeManager {
this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
+
+ // SPS configuration to decide blocks to move can share equal ratio of
+ // maxtransfers with pending replica and erasure-coded reconstruction tasks
+ blocksToMoveShareEqualRatio = conf.getBoolean(
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY,
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@@ -1094,13 +1102,14 @@ public class DatanodeManager {
// Sets dropSPSWork flag to true, to ensure that
// DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat
// response immediately after the node registration. This is
- // to avoid a situation, where multiple trackId responses coming from
- // different co-odinator datanodes. After SPS monitor time out, it
- // will retry the files which were scheduled to the disconnected(for
- // long time more than heartbeat expiry) DN, by finding new
- // co-ordinator datanode. Now, if the expired datanode reconnects back
- // after SPS reschedules, it leads to get different movement results
- // from reconnected and new DN co-ordinators.
+ // to avoid a situation, where multiple block attempt finished
+ // responses coming from different datanodes. After SPS monitor time
+ // out, it will retry the files which were scheduled to the
+ // disconnected(for long time more than heartbeat expiry) DN, by
+ // finding new datanode. Now, if the expired datanode reconnects back
+ // after SPS reschedules, it leads to get different movement attempt
+ // finished report from reconnected and newly datanode which is
+ // attempting the block movement.
nodeS.setDropSPSWork(true);
// resolve network location
@@ -1680,19 +1689,47 @@ public class DatanodeManager {
final List<DatanodeCommand> cmds = new ArrayList<>();
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
- // replication and erasure-coded block queues.
+ // replication, erasure-coded block queues and block storage movement
+ // queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
+ int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
- if (totalBlocks > 0) {
- int numReplicationTasks = (int) Math.ceil(
- (double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
- int numECTasks = (int) Math.ceil(
- (double) (totalECBlocks * maxTransfers) / totalBlocks);
-
+ if (totalBlocks > 0 || totalBlocksToMove > 0) {
+ int numReplicationTasks = 0;
+ int numECTasks = 0;
+ int numBlocksToMoveTasks = 0;
+ // Check blocksToMoveShareEqualRatio configuration is true/false. If true,
+ // then equally sharing the max transfer. Otherwise gives high priority to
+ // the pending_replica/erasure-coded tasks and only the delta streams will
+ // be used for blocks to move tasks.
+ if (blocksToMoveShareEqualRatio) {
+ // add blocksToMove count to total blocks so that will get equal share
+ totalBlocks = totalBlocks + totalBlocksToMove;
+ numReplicationTasks = (int) Math
+ .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+ numECTasks = (int) Math
+ .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
+ numBlocksToMoveTasks = (int) Math
+ .ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks);
+ } else {
+ // Calculate the replica and ec tasks, then pick blocksToMove if there
+ // is any streams available.
+ numReplicationTasks = (int) Math
+ .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
+ numECTasks = (int) Math
+ .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks);
+ int numTasks = numReplicationTasks + numECTasks;
+ if (numTasks < maxTransfers) {
+ int remainingMaxTransfers = maxTransfers - numTasks;
+ numBlocksToMoveTasks = Math.min(totalBlocksToMove,
+ remainingMaxTransfers);
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
- + " erasure-coded tasks: " + numECTasks);
+ + " erasure-coded tasks: " + numECTasks + " blocks to move tasks: "
+ + numBlocksToMoveTasks);
}
// check pending replication tasks
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
@@ -1708,6 +1745,23 @@ public class DatanodeManager {
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
+ // check pending block storage movement tasks
+ if (nodeinfo.shouldDropSPSWork()) {
+ cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+ // Set back to false to indicate that the new value has been sent to the
+ // datanode.
+ nodeinfo.setDropSPSWork(false);
+ } else {
+ // Get pending block storage movement tasks
+ BlockMovingInfo[] blkStorageMovementInfos = nodeinfo
+ .getBlocksToMoveStorages(numBlocksToMoveTasks);
+
+ if (blkStorageMovementInfos != null) {
+ cmds.add(new BlockStorageMovementCommand(
+ DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId,
+ Arrays.asList(blkStorageMovementInfos)));
+ }
+ }
}
// check block invalidation
@@ -1751,24 +1805,6 @@ public class DatanodeManager {
}
}
- if (nodeinfo.shouldDropSPSWork()) {
- cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
- // Set back to false to indicate that the new value has been sent to the
- // datanode.
- nodeinfo.setDropSPSWork(false);
- }
-
- // check pending block storage movement tasks
- BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo
- .getBlocksToMoveStorages();
-
- if (blkStorageMovementInfosBatch != null) {
- cmds.add(new BlockStorageMovementCommand(
- DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT,
- blkStorageMovementInfosBatch.getTrackID(), blockPoolId,
- blkStorageMovementInfosBatch.getBlockMovingInfo()));
- }
-
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 9308471..1656b16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -799,8 +799,7 @@ class BPOfferService {
LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT");
BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd;
dn.getStoragePolicySatisfyWorker().processBlockMovingTasks(
- blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
- blkSPSCmd.getBlockMovingTasks());
+ blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks());
break;
case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index f537f49..b7beda4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -50,7 +51,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -513,8 +514,11 @@ class BPServiceActor implements Runnable {
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
SlowDiskReports.EMPTY_REPORT;
- BlocksStorageMovementResult[] blksMovementResults =
- getBlocksMovementResults();
+ // Get the blocks storage move attempt finished blocks
+ List<Block> results = dn.getStoragePolicySatisfyWorker()
+ .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks();
+ BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks =
+ getStorageMoveAttemptFinishedBlocks(results);
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
@@ -527,7 +531,7 @@ class BPServiceActor implements Runnable {
requestBlockReportLease,
slowPeers,
slowDisks,
- blksMovementResults);
+ storageMoveAttemptFinishedBlks);
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
@@ -537,20 +541,23 @@ class BPServiceActor implements Runnable {
// Remove the blocks movement results after successfully transferring
// to namenode.
dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
- .remove(blksMovementResults);
+ .remove(results);
return response;
}
- private BlocksStorageMovementResult[] getBlocksMovementResults() {
- List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
- .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler()
- .getBlksMovementResults();
- BlocksStorageMovementResult[] blksMovementResult =
- new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
- trackIdVsMovementStatus.toArray(blksMovementResult);
+ private BlocksStorageMoveAttemptFinished getStorageMoveAttemptFinishedBlocks(
+ List<Block> finishedBlks) {
- return blksMovementResult;
+ if (finishedBlks.isEmpty()) {
+ return null;
+ }
+
+ // Create BlocksStorageMoveAttemptFinished with currently finished
+ // blocks
+ Block[] blockList = new Block[finishedBlks.size()];
+ finishedBlks.toArray(blockList);
+ return new BlocksStorageMoveAttemptFinished(blockList);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index f3d2bb6..b3b9fd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -21,14 +21,14 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +41,12 @@ import org.slf4j.LoggerFactory;
public class BlockStorageMovementTracker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(BlockStorageMovementTracker.class);
- private final CompletionService<BlockMovementResult> moverCompletionService;
+ private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
private final BlocksMovementsStatusHandler blksMovementsStatusHandler;
- // Keeps the information - trackID vs its list of blocks
- private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
- private final Map<Long, List<BlockMovementResult>> movementResults;
+ // Keeps the information - block vs its list of future move tasks
+ private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures;
+ private final Map<Block, List<BlockMovementAttemptFinished>> movementResults;
private volatile boolean running = true;
@@ -59,7 +59,7 @@ public class BlockStorageMovementTracker implements Runnable {
* blocks movements status handler
*/
public BlockStorageMovementTracker(
- CompletionService<BlockMovementResult> moverCompletionService,
+ CompletionService<BlockMovementAttemptFinished> moverCompletionService,
BlocksMovementsStatusHandler handler) {
this.moverCompletionService = moverCompletionService;
this.moverTaskFutures = new HashMap<>();
@@ -82,32 +82,33 @@ public class BlockStorageMovementTracker implements Runnable {
}
}
try {
- Future<BlockMovementResult> future = moverCompletionService.take();
+ Future<BlockMovementAttemptFinished> future =
+ moverCompletionService.take();
if (future != null) {
- BlockMovementResult result = future.get();
+ BlockMovementAttemptFinished result = future.get();
LOG.debug("Completed block movement. {}", result);
- long trackId = result.getTrackId();
- List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
- .get(trackId);
+ Block block = result.getBlock();
+ List<Future<BlockMovementAttemptFinished>> blocksMoving =
+ moverTaskFutures.get(block);
if (blocksMoving == null) {
- LOG.warn("Future task doesn't exist for trackId " + trackId);
+ LOG.warn("Future task doesn't exist for block : {} ", block);
continue;
}
blocksMoving.remove(future);
- List<BlockMovementResult> resultPerTrackIdList =
- addMovementResultToTrackIdList(result);
+ List<BlockMovementAttemptFinished> resultPerTrackIdList =
+ addMovementResultToBlockIdList(result);
// Completed all the scheduled blocks movement under this 'trackId'.
- if (blocksMoving.isEmpty() || moverTaskFutures.get(trackId) == null) {
+ if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) {
synchronized (moverTaskFutures) {
- moverTaskFutures.remove(trackId);
+ moverTaskFutures.remove(block);
}
if (running) {
// handle completed or inprogress blocks movements per trackId.
blksMovementsStatusHandler.handle(resultPerTrackIdList);
}
- movementResults.remove(trackId);
+ movementResults.remove(block);
}
}
} catch (InterruptedException e) {
@@ -123,38 +124,39 @@ public class BlockStorageMovementTracker implements Runnable {
}
}
- private List<BlockMovementResult> addMovementResultToTrackIdList(
- BlockMovementResult result) {
- long trackId = result.getTrackId();
- List<BlockMovementResult> perTrackIdList;
+ private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList(
+ BlockMovementAttemptFinished result) {
+ Block block = result.getBlock();
+ List<BlockMovementAttemptFinished> perBlockIdList;
synchronized (movementResults) {
- perTrackIdList = movementResults.get(trackId);
- if (perTrackIdList == null) {
- perTrackIdList = new ArrayList<>();
- movementResults.put(trackId, perTrackIdList);
+ perBlockIdList = movementResults.get(block);
+ if (perBlockIdList == null) {
+ perBlockIdList = new ArrayList<>();
+ movementResults.put(block, perBlockIdList);
}
- perTrackIdList.add(result);
+ perBlockIdList.add(result);
}
- return perTrackIdList;
+ return perBlockIdList;
}
/**
* Add future task to the tracking list to check the completion status of the
* block movement.
*
- * @param trackID
- * tracking Id
+ * @param blockID
+ * block identifier
* @param futureTask
* future task used for moving the respective block
*/
- void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
+ void addBlock(Block block,
+ Future<BlockMovementAttemptFinished> futureTask) {
synchronized (moverTaskFutures) {
- List<Future<BlockMovementResult>> futures = moverTaskFutures
- .get(Long.valueOf(trackID));
+ List<Future<BlockMovementAttemptFinished>> futures =
+ moverTaskFutures.get(block);
// null for the first task
if (futures == null) {
futures = new ArrayList<>();
- moverTaskFutures.put(trackID, futures);
+ moverTaskFutures.put(block, futures);
}
futures.add(futureTask);
// Notify waiting tracker thread about the newly added tasks.
@@ -175,16 +177,6 @@ public class BlockStorageMovementTracker implements Runnable {
}
/**
- * @return the list of trackIds which are still waiting to complete all the
- * scheduled blocks movements.
- */
- Set<Long> getInProgressTrackIds() {
- synchronized (moverTaskFutures) {
- return moverTaskFutures.keySet();
- }
- }
-
- /**
* Sets running flag to false and clear the pending movement result queues.
*/
public void stopTracking() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 4e57805..47318f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
-import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -32,9 +31,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@@ -62,7 +59,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -89,14 +85,11 @@ public class StoragePolicySatisfyWorker {
private final int moverThreads;
private final ExecutorService moveExecutor;
- private final CompletionService<BlockMovementResult> moverCompletionService;
+ private final CompletionService<BlockMovementAttemptFinished> moverCompletionService;
private final BlocksMovementsStatusHandler handler;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
- private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds.
- private long nextInprogressRecheckTime;
-
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
@@ -111,16 +104,6 @@ public class StoragePolicySatisfyWorker {
movementTrackerThread = new Daemon(movementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
- // Interval to check that the inprogress trackIds. The time interval is
- // proportional o the heart beat interval time period.
- final long heartbeatIntervalSeconds = conf.getTimeDuration(
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
- DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
- inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds;
- // update first inprogress recheck time to a future time stamp.
- nextInprogressRecheckTime = monotonicNow()
- + inprogressTrackIdsCheckInterval;
-
// TODO: Needs to manage the number of concurrent moves per DataNode.
}
@@ -186,30 +169,26 @@ public class StoragePolicySatisfyWorker {
* separate thread. Each task will move the block replica to the target node &
* wait for the completion.
*
- * @param trackID
- * unique tracking identifier
- * @param blockPoolID
- * block pool ID
+ * @param blockPoolID block pool identifier
+ *
* @param blockMovingInfos
* list of blocks to be moved
*/
- public void processBlockMovingTasks(long trackID, String blockPoolID,
- Collection<BlockMovingInfo> blockMovingInfos) {
+ public void processBlockMovingTasks(final String blockPoolID,
+ final Collection<BlockMovingInfo> blockMovingInfos) {
LOG.debug("Received BlockMovingTasks {}", blockMovingInfos);
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
- assert blkMovingInfo.getSources().length == blkMovingInfo
- .getTargets().length;
- for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
- DatanodeInfo target = blkMovingInfo.getTargets()[i];
- BlockMovingTask blockMovingTask = new BlockMovingTask(
- trackID, blockPoolID, blkMovingInfo.getBlock(),
- blkMovingInfo.getSources()[i], target,
- blkMovingInfo.getSourceStorageTypes()[i],
- blkMovingInfo.getTargetStorageTypes()[i]);
- Future<BlockMovementResult> moveCallable = moverCompletionService
- .submit(blockMovingTask);
- movementTracker.addBlock(trackID, moveCallable);
- }
+ StorageType sourceStorageType = blkMovingInfo.getSourceStorageType();
+ StorageType targetStorageType = blkMovingInfo.getTargetStorageType();
+ assert sourceStorageType != targetStorageType
+ : "Source and Target storage type shouldn't be same!";
+ BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
+ blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
+ blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
+ Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
+ .submit(blockMovingTask);
+ movementTracker.addBlock(blkMovingInfo.getBlock(),
+ moveCallable);
}
}
@@ -217,8 +196,7 @@ public class StoragePolicySatisfyWorker {
* This class encapsulates the process of moving the block replica to the
* given target and wait for the response.
*/
- private class BlockMovingTask implements Callable<BlockMovementResult> {
- private final long trackID;
+ private class BlockMovingTask implements Callable<BlockMovementAttemptFinished> {
private final String blockPoolID;
private final Block block;
private final DatanodeInfo source;
@@ -226,10 +204,9 @@ public class StoragePolicySatisfyWorker {
private final StorageType srcStorageType;
private final StorageType targetStorageType;
- BlockMovingTask(long trackID, String blockPoolID, Block block,
+ BlockMovingTask(String blockPoolID, Block block,
DatanodeInfo source, DatanodeInfo target,
StorageType srcStorageType, StorageType targetStorageType) {
- this.trackID = trackID;
this.blockPoolID = blockPoolID;
this.block = block;
this.source = source;
@@ -239,23 +216,26 @@ public class StoragePolicySatisfyWorker {
}
@Override
- public BlockMovementResult call() {
+ public BlockMovementAttemptFinished call() {
BlockMovementStatus status = moveBlock();
- return new BlockMovementResult(trackID, block.getBlockId(), target,
- status);
+ return new BlockMovementAttemptFinished(block, source, target, status);
}
private BlockMovementStatus moveBlock() {
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
- + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+ + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
block, source, target, srcStorageType, targetStorageType);
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
+ datanode.incrementXmitsInProgress();
+
ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
DNConf dnConf = datanode.getDnConf();
- String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
+
+ String dnAddr = datanode.getDatanodeId()
+ .getXferAddr(dnConf.getConnectToDnViaHostname());
sock = datanode.newSocket();
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
dnConf.getSocketTimeout());
@@ -297,9 +277,10 @@ public class StoragePolicySatisfyWorker {
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
- block, source, target, targetStorageType, e);
+ block, source, target, targetStorageType, e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
+ datanode.decrementXmitsInProgress();
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
@@ -357,29 +338,25 @@ public class StoragePolicySatisfyWorker {
}
/**
- * This class represents result from a block movement task. This will have the
+ * This class represents status from a block movement task. This will have the
* information of the task which was successful or failed due to errors.
*/
- static class BlockMovementResult {
- private final long trackId;
- private final long blockId;
+ static class BlockMovementAttemptFinished {
+ private final Block block;
+ private final DatanodeInfo src;
private final DatanodeInfo target;
private final BlockMovementStatus status;
- BlockMovementResult(long trackId, long blockId,
+ BlockMovementAttemptFinished(Block block, DatanodeInfo src,
DatanodeInfo target, BlockMovementStatus status) {
- this.trackId = trackId;
- this.blockId = blockId;
+ this.block = block;
+ this.src = src;
this.target = target;
this.status = status;
}
- long getTrackId() {
- return trackId;
- }
-
- long getBlockId() {
- return blockId;
+ Block getBlock() {
+ return block;
}
BlockMovementStatus getStatus() {
@@ -388,99 +365,79 @@ public class StoragePolicySatisfyWorker {
@Override
public String toString() {
- return new StringBuilder().append("Block movement result(\n ")
- .append("track id: ").append(trackId).append(" block id: ")
- .append(blockId).append(" target node: ").append(target)
+ return new StringBuilder().append("Block movement attempt finished(\n ")
+ .append(" block : ")
+ .append(block).append(" src node: ").append(src)
+ .append(" target node: ").append(target)
.append(" movement status: ").append(status).append(")").toString();
}
}
/**
* Blocks movements status handler, which is used to collect details of the
- * completed or inprogress list of block movements and this status(success or
- * failure or inprogress) will be send to the namenode via heartbeat.
+ * completed block movements and it will send these attempted finished(with
+ * success or failure) blocks to the namenode via heartbeat.
*/
- class BlocksMovementsStatusHandler {
- private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
+ public static class BlocksMovementsStatusHandler {
+ private final List<Block> blockIdVsMovementStatus =
new ArrayList<>();
/**
- * Collect all the block movement results. Later this will be send to
- * namenode via heart beat.
+ * Collect all the storage movement attempt finished blocks. Later this will
+ * be send to namenode via heart beat.
*
- * @param results
- * result of all the block movements per trackId
+ * @param moveAttemptFinishedBlks
+ * set of storage movement attempt finished blocks
*/
- void handle(List<BlockMovementResult> resultsPerTrackId) {
- BlocksStorageMovementResult.Status status =
- BlocksStorageMovementResult.Status.SUCCESS;
- long trackId = -1;
- for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
- trackId = blockMovementResult.getTrackId();
- if (blockMovementResult.status ==
- BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
- status = BlocksStorageMovementResult.Status.FAILURE;
- // If any of the block movement is failed, then mark as failure so
- // that namenode can take a decision to retry the blocks associated to
- // the given trackId.
- break;
- }
- }
+ void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
+ List<Block> blocks = new ArrayList<>();
- // Adding to the tracking results list. Later this will be send to
+ for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
+ blocks.add(item.getBlock());
+ }
+ // Adding to the tracking report list. Later this will be send to
// namenode via datanode heartbeat.
- synchronized (trackIdVsMovementStatus) {
- trackIdVsMovementStatus.add(
- new BlocksStorageMovementResult(trackId, status));
+ synchronized (blockIdVsMovementStatus) {
+ blockIdVsMovementStatus.addAll(blocks);
}
}
/**
- * @return unmodifiable list of blocks storage movement results.
+ * @return unmodifiable list of storage movement attempt finished blocks.
*/
- List<BlocksStorageMovementResult> getBlksMovementResults() {
- List<BlocksStorageMovementResult> movementResults = new ArrayList<>();
- // 1. Adding all the completed trackids.
- synchronized (trackIdVsMovementStatus) {
- if (trackIdVsMovementStatus.size() > 0) {
- movementResults = Collections
- .unmodifiableList(trackIdVsMovementStatus);
+ List<Block> getMoveAttemptFinishedBlocks() {
+ List<Block> moveAttemptFinishedBlks = new ArrayList<>();
+ // 1. Adding all the completed block ids.
+ synchronized (blockIdVsMovementStatus) {
+ if (blockIdVsMovementStatus.size() > 0) {
+ moveAttemptFinishedBlks = Collections
+ .unmodifiableList(blockIdVsMovementStatus);
}
}
- // 2. Adding the in progress track ids after those which are completed.
- Set<Long> inProgressTrackIds = getInProgressTrackIds();
- for (Long trackId : inProgressTrackIds) {
- movementResults.add(new BlocksStorageMovementResult(trackId,
- BlocksStorageMovementResult.Status.IN_PROGRESS));
- }
- return movementResults;
+ return moveAttemptFinishedBlks;
}
/**
- * Remove the blocks storage movement results.
+ * Remove the storage movement attempt finished blocks from the tracking
+ * list.
*
- * @param results
- * set of blocks storage movement results
+ * @param moveAttemptFinishedBlks
+ * set of storage movement attempt finished blocks
*/
- void remove(BlocksStorageMovementResult[] results) {
- if (results != null) {
- synchronized (trackIdVsMovementStatus) {
- for (BlocksStorageMovementResult blocksMovementResult : results) {
- trackIdVsMovementStatus.remove(blocksMovementResult);
- }
- }
+ void remove(List<Block> moveAttemptFinishedBlks) {
+ if (moveAttemptFinishedBlks != null) {
+ blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
}
}
/**
- * Clear the trackID vs movement status tracking map.
+ * Clear the blockID vs movement status tracking map.
*/
void removeAll() {
- synchronized (trackIdVsMovementStatus) {
- trackIdVsMovementStatus.clear();
+ synchronized (blockIdVsMovementStatus) {
+ blockIdVsMovementStatus.clear();
}
}
-
}
@VisibleForTesting
@@ -498,23 +455,4 @@ public class StoragePolicySatisfyWorker {
movementTracker.removeAll();
handler.removeAll();
}
-
- /**
- * Gets list of trackids which are inprogress. Will do collection periodically
- * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time.
- * millis' interval.
- *
- * @return collection of trackids which are inprogress
- */
- private Set<Long> getInProgressTrackIds() {
- Set<Long> trackIds = new HashSet<>();
- long now = monotonicNow();
- if (nextInprogressRecheckTime >= now) {
- trackIds = movementTracker.getInProgressTrackIds();
-
- // schedule next re-check interval
- nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval;
- }
- return trackIds;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 549819f..cc5b63a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -22,15 +22,12 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,14 +35,12 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
- * A monitor class for checking whether block storage movements finished or not.
- * If block storage movement results from datanode indicates about the movement
- * success, then it will just remove the entries from tracking. If it reports
- * failure, then it will add back to needed block storage movements list. If it
- * reports in_progress, that means the blocks movement is in progress and the
- * coordinator is still tracking the movement. If no DN reports about movement
- * for longer time, then such items will be retries automatically after timeout.
- * The default timeout would be 30mins.
+ * A monitor class for checking whether block storage movements attempt
+ * completed or not. If this receives block storage movement attempt
+ * status(either success or failure) from DN then it will just remove the
+ * entries from tracking. If there is no DN reports about movement attempt
+ * finished for a longer time period, then such items will retries automatically
+ * after timeout. The default timeout would be 5 minutes.
*/
public class BlockStorageMovementAttemptedItems {
private static final Logger LOG =
@@ -55,37 +50,34 @@ public class BlockStorageMovementAttemptedItems {
* A map holds the items which are already taken for blocks movements
* processing and sent to DNs.
*/
- private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
- private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
+ private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+ private final List<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
- private final StoragePolicySatisfier sps;
//
- // It might take anywhere between 20 to 60 minutes before
+ // It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
- private long selfRetryTimeout = 20 * 60 * 1000;
+ private long selfRetryTimeout = 5 * 60 * 1000;
//
- // It might take anywhere between 5 to 10 minutes before
+ // It might take anywhere between 1 to 2 minutes before
// a request is timed out.
//
- private long minCheckTimeout = 5 * 60 * 1000; // minimum value
+ private long minCheckTimeout = 1 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded;
public BlockStorageMovementAttemptedItems(long recheckTimeout,
long selfRetryTimeout,
- BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
- StoragePolicySatisfier sps) {
+ BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
if (recheckTimeout > 0) {
this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
}
this.selfRetryTimeout = selfRetryTimeout;
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
- storageMovementAttemptedItems = new HashMap<>();
- storageMovementAttemptedResults = new ArrayList<>();
- this.sps = sps;
+ storageMovementAttemptedItems = new ArrayList<>();
+ movementFinishedBlocks = new ArrayList<>();
}
/**
@@ -94,33 +86,26 @@ public class BlockStorageMovementAttemptedItems {
*
* @param itemInfo
* - tracking info
- * @param allBlockLocsAttemptedToSatisfy
- * - failed to find matching target nodes to satisfy storage type
- * for all the block locations of the given blockCollectionID
*/
- public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
+ public void add(AttemptedItemInfo itemInfo) {
synchronized (storageMovementAttemptedItems) {
- AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
- itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
- allBlockLocsAttemptedToSatisfy);
- storageMovementAttemptedItems.put(itemInfo.getTrackId(),
- attemptedItemInfo);
+ storageMovementAttemptedItems.add(itemInfo);
}
}
/**
- * Add the trackIDBlocksStorageMovementResults to
- * storageMovementAttemptedResults.
+ * Add the storage movement attempt finished blocks to
+ * storageMovementFinishedBlocks.
*
- * @param blksMovementResults
+ * @param moveAttemptFinishedBlks
+ * storage movement attempt finished blocks
*/
- public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
- if (blksMovementResults.length == 0) {
+ public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+ if (moveAttemptFinishedBlks.length == 0) {
return;
}
- synchronized (storageMovementAttemptedResults) {
- storageMovementAttemptedResults
- .addAll(Arrays.asList(blksMovementResults));
+ synchronized (movementFinishedBlocks) {
+ movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
}
}
@@ -129,8 +114,8 @@ public class BlockStorageMovementAttemptedItems {
*/
public synchronized void start() {
monitorRunning = true;
- timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
- timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
+ timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
+ timerThread.setName("BlocksStorageMovementAttemptMonitor");
timerThread.start();
}
@@ -163,82 +148,22 @@ public class BlockStorageMovementAttemptedItems {
}
/**
- * This class contains information of an attempted trackID. Information such
- * as, (a)last attempted or reported time stamp, (b)whether all the blocks in
- * the trackID were attempted and blocks movement has been scheduled to
- * satisfy storage policy. This is used by
- * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
- */
- private final static class AttemptedItemInfo extends ItemInfo {
- private long lastAttemptedOrReportedTime;
- private final boolean allBlockLocsAttemptedToSatisfy;
-
- /**
- * AttemptedItemInfo constructor.
- *
- * @param rootId
- * rootId for trackId
- * @param trackId
- * trackId for file.
- * @param lastAttemptedOrReportedTime
- * last attempted or reported time
- * @param allBlockLocsAttemptedToSatisfy
- * whether all the blocks in the trackID were attempted and blocks
- * movement has been scheduled to satisfy storage policy
- */
- private AttemptedItemInfo(long rootId, long trackId,
- long lastAttemptedOrReportedTime,
- boolean allBlockLocsAttemptedToSatisfy) {
- super(rootId, trackId);
- this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
- this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
- }
-
- /**
- * @return last attempted or reported time stamp.
- */
- private long getLastAttemptedOrReportedTime() {
- return lastAttemptedOrReportedTime;
- }
-
- /**
- * @return true/false. True value represents that, all the block locations
- * under the trackID has found matching target nodes to satisfy
- * storage policy. False value represents that, trackID needed
- * retries to satisfy the storage policy for some of the block
- * locations.
- */
- private boolean isAllBlockLocsAttemptedToSatisfy() {
- return allBlockLocsAttemptedToSatisfy;
- }
-
- /**
- * Update lastAttemptedOrReportedTime, so that the expiration time will be
- * postponed to future.
- */
- private void touchLastReportedTimeStamp() {
- this.lastAttemptedOrReportedTime = monotonicNow();
- }
-
- }
-
- /**
- * A monitor class for checking block storage movement result and long waiting
- * items periodically.
+ * A monitor class for checking block storage movement attempt status and long
+ * waiting items periodically.
*/
- private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
+ private class BlocksStorageMovementAttemptMonitor implements Runnable {
@Override
public void run() {
while (monitorRunning) {
try {
- blockStorageMovementResultCheck();
+ blockStorageMovementReportedItemsCheck();
blocksStorageMovementUnReportedItemsCheck();
Thread.sleep(minCheckTimeout);
} catch (InterruptedException ie) {
- LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
+ LOG.info("BlocksStorageMovementAttemptMonitor thread "
+ "is interrupted.", ie);
} catch (IOException ie) {
- LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
+ LOG.warn("BlocksStorageMovementAttemptMonitor thread "
+ "received exception and exiting.", ie);
}
}
@@ -248,29 +173,21 @@ public class BlockStorageMovementAttemptedItems {
@VisibleForTesting
void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) {
- Iterator<Entry<Long, AttemptedItemInfo>> iter =
- storageMovementAttemptedItems.entrySet().iterator();
+ Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+ .iterator();
long now = monotonicNow();
while (iter.hasNext()) {
- Entry<Long, AttemptedItemInfo> entry = iter.next();
- AttemptedItemInfo itemInfo = entry.getValue();
+ AttemptedItemInfo itemInfo = iter.next();
if (now > itemInfo.getLastAttemptedOrReportedTime()
+ selfRetryTimeout) {
- Long blockCollectionID = entry.getKey();
- synchronized (storageMovementAttemptedResults) {
- if (!isExistInResult(blockCollectionID)) {
- ItemInfo candidate = new ItemInfo(
- itemInfo.getStartId(), blockCollectionID);
- blockStorageMovementNeeded.add(candidate);
- iter.remove();
- LOG.info("TrackID: {} becomes timed out and moved to needed "
- + "retries queue for next iteration.", blockCollectionID);
- } else {
- LOG.info("Blocks storage movement results for the"
- + " tracking id : " + blockCollectionID
- + " is reported from one of the co-ordinating datanode."
- + " So, the result will be processed soon.");
- }
+ Long blockCollectionID = itemInfo.getTrackId();
+ synchronized (movementFinishedBlocks) {
+ ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
+ blockCollectionID);
+ blockStorageMovementNeeded.add(candidate);
+ iter.remove();
+ LOG.info("TrackID: {} becomes timed out and moved to needed "
+ + "retries queue for next iteration.", blockCollectionID);
}
}
}
@@ -278,118 +195,38 @@ public class BlockStorageMovementAttemptedItems {
}
}
- private boolean isExistInResult(Long blockCollectionID) {
- Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
- .iterator();
- while (iter.hasNext()) {
- BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
- if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
- return true;
- }
- }
- return false;
- }
-
@VisibleForTesting
- void blockStorageMovementResultCheck() throws IOException {
- synchronized (storageMovementAttemptedResults) {
- Iterator<BlocksStorageMovementResult> resultsIter =
- storageMovementAttemptedResults.iterator();
- while (resultsIter.hasNext()) {
- boolean isInprogress = false;
- // TrackID need to be retried in the following cases:
- // 1) All or few scheduled block(s) movement has been failed.
- // 2) All the scheduled block(s) movement has been succeeded but there
- // are unscheduled block(s) movement in this trackID. Say, some of
- // the blocks in the trackID couldn't finding any matching target node
- // for scheduling block movement in previous SPS iteration.
- BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
- .next();
+ void blockStorageMovementReportedItemsCheck() throws IOException {
+ synchronized (movementFinishedBlocks) {
+ Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
+ while (finishedBlksIter.hasNext()) {
+ Block blk = finishedBlksIter.next();
synchronized (storageMovementAttemptedItems) {
- Status status = storageMovementAttemptedResult.getStatus();
- long trackId = storageMovementAttemptedResult.getTrackId();
- AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
- .get(trackId);
- // itemInfo is null means no root for trackId, using trackId only as
- // root and handling it in
- // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
- // the xAttr
- ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
- ? attemptedItemInfo.getStartId() : trackId, trackId);
- switch (status) {
- case FAILURE:
- if (attemptedItemInfo != null) {
- blockStorageMovementNeeded.add(itemInfo);
- LOG.warn("Blocks storage movement results for the tracking id:"
- + "{} is reported from co-ordinating datanode, but result"
- + " status is FAILURE. So, added for retry", trackId);
- } else {
- LOG.info("Blocks storage movement is FAILURE for the track"
- + " id {}. But the trackID doesn't exists in"
- + " storageMovementAttemptedItems list.", trackId);
- blockStorageMovementNeeded
- .removeItemTrackInfo(itemInfo);
- }
- break;
- case SUCCESS:
- // ItemInfo could be null. One case is, before the blocks movements
- // result arrives the attempted trackID became timed out and then
- // removed the trackID from the storageMovementAttemptedItems list.
- // TODO: Need to ensure that trackID is added to the
- // 'blockStorageMovementNeeded' queue for retries to handle the
- // following condition. If all the block locations under the trackID
- // are attempted and failed to find matching target nodes to satisfy
- // storage policy in previous SPS iteration.
- String msg = "Blocks storage movement is SUCCESS for the track id: "
- + trackId + " reported from co-ordinating datanode.";
- if (attemptedItemInfo != null) {
- if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
- blockStorageMovementNeeded
- .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
- LOG.warn("{} But adding trackID back to retry queue as some of"
- + " the blocks couldn't find matching target nodes in"
- + " previous SPS iteration.", msg);
- } else {
- LOG.info(msg);
- blockStorageMovementNeeded
- .removeItemTrackInfo(itemInfo);
- }
- } else {
- LOG.info("{} But the trackID doesn't exists in "
- + "storageMovementAttemptedItems list", msg);
+ Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
+ .iterator();
+ while (iterator.hasNext()) {
+ AttemptedItemInfo attemptedItemInfo = iterator.next();
+ attemptedItemInfo.getBlocks().remove(blk);
+ if (attemptedItemInfo.getBlocks().isEmpty()) {
+ // TODO: try add this at front of the Queue, so that this element
+ // gets the chance first and can be cleaned from queue quickly as
+ // all movements already done.
blockStorageMovementNeeded
- .removeItemTrackInfo(itemInfo);
- }
- break;
- case IN_PROGRESS:
- isInprogress = true;
- attemptedItemInfo = storageMovementAttemptedItems
- .get(storageMovementAttemptedResult.getTrackId());
- if(attemptedItemInfo != null){
- // update the attempted expiration time to next cycle.
- attemptedItemInfo.touchLastReportedTimeStamp();
+ .add(new ItemInfo(attemptedItemInfo.getStartId(),
+ attemptedItemInfo.getTrackId()));
+ iterator.remove();
}
- break;
- default:
- LOG.error("Unknown status: {}", status);
- break;
- }
- // Remove trackID from the attempted list if the attempt has been
- // completed(success or failure), if any.
- if (!isInprogress) {
- storageMovementAttemptedItems
- .remove(storageMovementAttemptedResult.getTrackId());
}
}
- // Remove trackID from results as processed above.
- resultsIter.remove();
+ // Remove attempted blocks from movementFinishedBlocks list.
+ finishedBlksIter.remove();
}
}
}
@VisibleForTesting
- public int resultsCount() {
- return storageMovementAttemptedResults.size();
+ public int getMovementFinishedBlocksCount() {
+ return movementFinishedBlocks.size();
}
@VisibleForTesting
@@ -398,7 +235,7 @@ public class BlockStorageMovementAttemptedItems {
}
public void clearQueues() {
- storageMovementAttemptedResults.clear();
+ movementFinishedBlocks.clear();
storageMovementAttemptedItems.clear();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
deleted file mode 100644
index a790c13..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.util.List;
-
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-
-/**
- * This class represents a batch of blocks under one trackId which needs to move
- * its storage locations to satisfy the storage policy.
- */
-public class BlockStorageMovementInfosBatch {
- private long trackID;
- private List<BlockMovingInfo> blockMovingInfos;
-
- /**
- * Constructor to create the block storage movement infos batch.
- *
- * @param trackID
- * - unique identifier which will be used for tracking the given set
- * of blocks movement.
- * @param blockMovingInfos
- * - list of block to storage infos.
- */
- public BlockStorageMovementInfosBatch(long trackID,
- List<BlockMovingInfo> blockMovingInfos) {
- this.trackID = trackID;
- this.blockMovingInfos = blockMovingInfos;
- }
-
- public long getTrackID() {
- return trackID;
- }
-
- public List<BlockMovingInfo> getBlockMovingInfo() {
- return blockMovingInfos;
- }
-
- @Override
- public String toString() {
- return new StringBuilder().append("BlockStorageMovementInfosBatch(\n ")
- .append("TrackID: ").append(trackID).append(" BlockMovingInfos: ")
- .append(blockMovingInfos).append(")").toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1abca8dd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 33d6fe7..7bfc3d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -266,7 +266,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3884,7 +3884,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks,
- BlocksStorageMovementResult[] blksMovementResults) throws IOException {
+ BlocksStorageMoveAttemptFinished blksMovementsFinished)
+ throws IOException {
readLock();
try {
//get datanode commands
@@ -3905,11 +3906,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (!sps.isRunning()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Storage policy satisfier is not running. So, ignoring block "
- + "storage movement results sent by co-ordinator datanode");
+ "Storage policy satisfier is not running. So, ignoring storage"
+ + " movement attempt finished block info sent by DN");
}
} else {
- sps.handleBlocksStorageMovementResults(blksMovementResults);
+ sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org