You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/02/17 14:50:15 UTC
[35/50] [abbrv] hadoop git commit: HDFS-10954. [SPS]: Provide
mechanism to send blocks movement result back to NN from coordinator DN.
Contributed by Rakesh R
HDFS-10954. [SPS]: Provide mechanism to send blocks movement result back to NN from coordinator DN. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/047e1cd4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/047e1cd4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/047e1cd4
Branch: refs/heads/HDFS-10285
Commit: 047e1cd4368f7c65603c1ad7337e965a4ec306cb
Parents: e4983d5
Author: Rakesh Radhakrishnan <ra...@apache.org>
Authored: Thu Nov 3 09:39:14 2016 +0530
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Fri Feb 17 19:49:39 2017 +0530
----------------------------------------------------------------------
.../DatanodeProtocolClientSideTranslatorPB.java | 9 ++-
.../DatanodeProtocolServerSideTranslatorPB.java | 4 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 52 +++++++++++++++
.../server/blockmanagement/BlockManager.java | 4 ++
.../hdfs/server/datanode/BPServiceActor.java | 25 ++++++-
.../datanode/StoragePolicySatisfyWorker.java | 70 +++++++++++++++++---
.../hdfs/server/namenode/FSNamesystem.java | 10 ++-
.../hdfs/server/namenode/NameNodeRpcServer.java | 7 +-
.../server/namenode/StoragePolicySatisfier.java | 23 +++++++
.../protocol/BlocksStorageMovementResult.java | 64 ++++++++++++++++++
.../hdfs/server/protocol/DatanodeProtocol.java | 5 +-
.../src/main/proto/DatanodeProtocol.proto | 14 ++++
.../TestNameNodePrunesMissingStorages.java | 3 +-
.../datanode/InternalDataNodeTestUtils.java | 4 +-
.../server/datanode/TestBPOfferService.java | 8 ++-
.../hdfs/server/datanode/TestBlockRecovery.java | 4 +-
.../server/datanode/TestDataNodeLifeline.java | 7 +-
.../TestDatanodeProtocolRetryPolicy.java | 4 +-
.../server/datanode/TestFsDatasetCache.java | 4 +-
.../TestStoragePolicySatisfyWorker.java | 13 ++--
.../hdfs/server/datanode/TestStorageReport.java | 4 +-
.../server/namenode/NNThroughputBenchmark.java | 6 +-
.../hdfs/server/namenode/NameNodeAdapter.java | 3 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 4 +-
.../namenode/TestStoragePolicySatisfier.java | 50 ++++++++++++++
25 files changed, 363 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index d9e6026..09a8c32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -136,7 +137,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
- @Nonnull SlowPeerReports slowPeers) throws IOException {
+ @Nonnull SlowPeerReports slowPeers,
+ BlocksStorageMovementResult[] blksMovementResults) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -156,6 +158,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
if (slowPeers.haveSlowPeers()) {
builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
}
+
+ // Adding blocks movement results to the heart beat request.
+ builder.addAllBlksMovementResults(
+ PBHelper.convertBlksMovResults(blksMovementResults));
+
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index b1c8e34..56d3f20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -121,7 +121,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease(),
- PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
+ PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
+ PBHelper.convertBlksMovResults(
+ request.getBlksMovementResultsList()));
} catch (IOException e) {
throw new ServiceException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 6949d71..8caa277 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailur
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -96,6 +97,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -884,6 +887,55 @@ public class PBHelper {
return SlowPeerReports.create(slowPeersMap);
}
+ public static BlocksStorageMovementResult[] convertBlksMovResults(
+ List<BlocksStorageMovementResultProto> protos) {
+ BlocksStorageMovementResult[] results =
+ new BlocksStorageMovementResult[protos.size()];
+ for (int i = 0; i < protos.size(); i++) {
+ BlocksStorageMovementResultProto resultProto = protos.get(i);
+ BlocksStorageMovementResult.Status status;
+ switch (resultProto.getStatus()) {
+ case SUCCESS:
+ status = Status.SUCCESS;
+ break;
+ case FAILURE:
+ status = Status.FAILURE;
+ break;
+ default:
+ throw new AssertionError("Unknown status: " + resultProto.getStatus());
+ }
+ results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
+ status);
+ }
+ return results;
+ }
+
+ public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
+ BlocksStorageMovementResult[] blocksMovementResults) {
+ List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
+ new ArrayList<>();
+ BlocksStorageMovementResultProto.Builder builder =
+ BlocksStorageMovementResultProto.newBuilder();
+ for (int i = 0; i < blocksMovementResults.length; i++) {
+ BlocksStorageMovementResult report = blocksMovementResults[i];
+ builder.setTrackID(report.getTrackId());
+ BlocksStorageMovementResultProto.Status status;
+ switch (report.getStatus()) {
+ case SUCCESS:
+ status = BlocksStorageMovementResultProto.Status.SUCCESS;
+ break;
+ case FAILURE:
+ status = BlocksStorageMovementResultProto.Status.FAILURE;
+ break;
+ default:
+ throw new AssertionError("Unknown status: " + report.getStatus());
+ }
+ builder.setStatus(status);
+ blocksMovementResultsProto.add(builder.build());
+ }
+ return blocksMovementResultsProto;
+ }
+
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index d5bb793..a3fae3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4746,4 +4746,8 @@ public class BlockManager implements BlockStatsMXBean {
public void satisfyStoragePolicy(long id) {
storageMovementNeeded.add(id);
}
+
+ public StoragePolicySatisfier getStoragePolicySatisfier() {
+ return sps;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 644a8ab..da1eddf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -502,6 +503,10 @@ class BPServiceActor implements Runnable {
slowPeersReportDue && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
+
+ BlocksStorageMovementResult[] blksMovementResults =
+ getBlocksMovementResults();
+
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
@@ -511,15 +516,33 @@ class BPServiceActor implements Runnable {
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
- slowPeers);
+ slowPeers,
+ blksMovementResults);
if (slowPeersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextSlowPeerReport();
}
+
+ // Remove the blocks movement results after successfully transferring
+ // to namenode.
+ dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+ .remove(blksMovementResults);
+
return response;
}
+ private BlocksStorageMovementResult[] getBlocksMovementResults() {
+ List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
+ .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+ .getBlksMovementResults();
+ BlocksStorageMovementResult[] blksMovementResult =
+ new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
+ trackIdVsMovementStatus.toArray(blksMovementResult);
+
+ return blksMovementResult;
+ }
+
@VisibleForTesting
void sendLifelineForTests() throws IOException {
lifelineSender.sendLifeline();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 604fb4a..8a8f87d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@@ -276,7 +278,7 @@ public class StoragePolicySatisfyWorker {
/**
* Block movement status code.
*/
- enum BlockMovementStatus {
+ public static enum BlockMovementStatus {
/** Success. */
DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
/**
@@ -343,26 +345,72 @@ public class StoragePolicySatisfyWorker {
/**
* Blocks movements completion handler, which is used to collect details of
- * the completed list of block movements and notify the namenode about the
- * success or failures.
+ * the completed list of block movements and this status(success or failure)
+ * will be send to the namenode via heartbeat.
*/
static class BlocksMovementsCompletionHandler {
- private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
+ private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
+ new ArrayList<>();
/**
- * Collect all the block movement results and notify namenode.
+ * Collect all the block movement results. Later this will be send to
+ * namenode via heart beat.
*
* @param results
* result of all the block movements per trackId
*/
- void handle(List<BlockMovementResult> results) {
- completedBlocks.addAll(results);
- // TODO: notify namenode about the success/failures.
+ void handle(List<BlockMovementResult> resultsPerTrackId) {
+ BlocksStorageMovementResult.Status status =
+ BlocksStorageMovementResult.Status.SUCCESS;
+ long trackId = -1;
+ for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
+ trackId = blockMovementResult.getTrackId();
+ if (blockMovementResult.status ==
+ BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
+ status = BlocksStorageMovementResult.Status.FAILURE;
+ // If any of the block movement is failed, then mark as failure so
+ // that namenode can take a decision to retry the blocks associated to
+ // the given trackId.
+ break;
+ }
+ }
+
+ // Adding to the tracking results list. Later this will be send to
+ // namenode via datanode heartbeat.
+ synchronized (trackIdVsMovementStatus) {
+ trackIdVsMovementStatus.add(
+ new BlocksStorageMovementResult(trackId, status));
+ }
+ }
+
+ /**
+ * @return unmodifiable list of blocks storage movement results.
+ */
+ List<BlocksStorageMovementResult> getBlksMovementResults() {
+ synchronized (trackIdVsMovementStatus) {
+ if (trackIdVsMovementStatus.size() <= 0) {
+ return new ArrayList<>();
+ }
+ List<BlocksStorageMovementResult> results = Collections
+ .unmodifiableList(trackIdVsMovementStatus);
+ return results;
+ }
}
- @VisibleForTesting
- List<BlockMovementResult> getCompletedBlocks() {
- return completedBlocks;
+ /**
+ * Remove the blocks storage movement results.
+ *
+ * @param results
+ * set of blocks storage movement results
+ */
+ void remove(BlocksStorageMovementResult[] results) {
+ if (results != null) {
+ synchronized (trackIdVsMovementStatus) {
+ for (BlocksStorageMovementResult blocksMovementResult : results) {
+ trackIdVsMovementStatus.remove(blocksMovementResult);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 38a326c..09ef4e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -247,6 +247,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3642,7 +3643,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
- @Nonnull SlowPeerReports slowPeers) throws IOException {
+ @Nonnull SlowPeerReports slowPeers,
+ BlocksStorageMovementResult[] blksMovementResults) throws IOException {
readLock();
try {
//get datanode commands
@@ -3656,6 +3658,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
+
+ // TODO: Handle blocks movement results send by the coordinator datanode.
+ // This has to be revisited as part of HDFS-11029.
+ blockManager.getStoragePolicySatisfier()
+ .handleBlocksStorageMovementResults(blksMovementResults);
+
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f9cfa42..d4577a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1422,13 +1423,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
- @Nonnull SlowPeerReports slowPeers) throws IOException {
+ @Nonnull SlowPeerReports slowPeers,
+ BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
- slowPeers);
+ slowPeers,
+ blkMovementStatus);
}
@Override // DatanodeProtocol
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index b5aed37..fbe686a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -39,11 +39,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Setting storagePolicy on a file after the file write will only update the new
* storage policy type in Namespace, but physical block storage movement will
@@ -394,4 +397,24 @@ public class StoragePolicySatisfier implements Runnable {
return typeNodeMap.get(type);
}
}
+
+ // TODO: Temporarily keeping the results for assertion. This has to be
+ // revisited as part of HDFS-11029.
+ @VisibleForTesting
+ List<BlocksStorageMovementResult> results = new ArrayList<>();
+
+ /**
+ * Receives the movement results of collection of blocks associated to a
+ * trackId.
+ *
+ * @param blksMovementResults
+ * movement status of the set of blocks associated to a trackId.
+ */
+ void handleBlocksStorageMovementResults(
+ BlocksStorageMovementResult[] blksMovementResults) {
+ if (blksMovementResults.length <= 0) {
+ return;
+ }
+ results.addAll(Arrays.asList(blksMovementResults));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
new file mode 100644
index 0000000..1afba34
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * This class represents, movement status of a set of blocks associated to a
+ * track Id.
+ */
+public class BlocksStorageMovementResult {
+
+ private final long trackId;
+ private final Status status;
+
+ /**
+ * SUCCESS - If all the blocks associated to track id has moved successfully
+ * or maximum possible movements done.
+ *
+ * <p>
+ * FAILURE - If any of its(trackId) blocks movement failed and requires to
+ * retry these failed blocks movements. Example selected target node is no
+ * more running or no space. So, retrying by selecting new target node might
+ * work.
+ */
+ public static enum Status {
+ SUCCESS, FAILURE;
+ }
+
+ /**
+ * BlocksStorageMovementResult constructor.
+ *
+ * @param trackId
+ * tracking identifier
+ * @param status
+ * block movement status
+ */
+ public BlocksStorageMovementResult(long trackId, Status status) {
+ this.trackId = trackId;
+ this.status = status;
+ }
+
+ public long getTrackId() {
+ return trackId;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 79a0132..c54f90c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -111,6 +111,8 @@ public interface DatanodeProtocol {
* @param slowPeers Details of peer DataNodes that were detected as being
* slow to respond to packet writes. Empty report if no
* slow peers were detected by the DataNode.
+ * @param blksMovementResults array of movement status of a set of blocks
+ * associated to a trackId.
* @throws IOException on error
*/
@Idempotent
@@ -123,7 +125,8 @@ public interface DatanodeProtocol {
int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
- @Nonnull SlowPeerReports slowPeers)
+ @Nonnull SlowPeerReports slowPeers,
+ BlocksStorageMovementResult[] blksMovementResults)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 7fd2781..f914935 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -177,6 +177,18 @@ message BlockStorageMovementProto {
}
/**
+ * Movement status of the set of blocks associated to a trackId.
+ */
+message BlocksStorageMovementResultProto {
+ enum Status {
+ SUCCESS = 1; // block movement succeeded
+ FAILURE = 2; // block movement failed and needs to retry
+ }
+ required uint64 trackID = 1;
+ required Status status = 2;
+}
+
+/**
* registration - Information of the datanode registering with the namenode
*/
message RegisterDatanodeRequestProto {
@@ -218,6 +230,7 @@ message VolumeFailureSummaryProto {
* cacheUsed - amount of cache used
* volumeFailureSummary - info about volume failures
* slowPeers - info about peer DataNodes that are suspected to be slow.
+ * blksMovementResults - status of the scheduled blocks movements
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -230,6 +243,7 @@ message HeartbeatRequestProto {
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
optional bool requestFullBlockReportLease = 9 [ default = false ];
repeated SlowPeerReportProto slowPeers = 10;
+ repeated BlocksStorageMovementResultProto blksMovementResults = 11;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index a5c6e0d..a146322 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -114,7 +115,7 @@ public class TestNameNodePrunesMissingStorages {
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
- 0, null, true, SlowPeerReports.EMPTY_REPORT);
+ 0, null, true, SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index cf43fd0..a1275df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -138,7 +139,8 @@ public class InternalDataNodeTestUtils {
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
- Mockito.any(SlowPeerReports.class))).thenReturn(
+ Mockito.any(SlowPeerReports.class),
+ Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index c6b38ee..1f670d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -121,6 +122,8 @@ public class TestBPOfferService {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
.when(mockDn).getMetrics();
+ Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
+ .getStoragePolicySatisfyWorker();
// Set up a simulated dataset with our fake BP
mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -154,7 +157,8 @@ public class TestBPOfferService {
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
- Mockito.any(SlowPeerReports.class));
+ Mockito.any(SlowPeerReports.class),
+ Mockito.any(BlocksStorageMovementResult[].class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;
@@ -344,6 +348,8 @@ public class TestBPOfferService {
Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
when(mockDn).getMetrics();
+ Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
+ .getStoragePolicySatisfyWorker();
final AtomicInteger count = new AtomicInteger();
Mockito.doAnswer(new Answer<Void>() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index b64f1e2..3667151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -219,7 +220,8 @@ public class TestBlockRecovery {
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
- Mockito.any(SlowPeerReports.class)))
+ Mockito.any(SlowPeerReports.class),
+ Mockito.any(BlocksStorageMovementResult[].class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 8a9f0b8..3f56fd4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -169,7 +170,8 @@ public class TestDataNodeLifeline {
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean(),
- any(SlowPeerReports.class));
+ any(SlowPeerReports.class),
+ any(BlocksStorageMovementResult[].class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -233,7 +235,8 @@ public class TestDataNodeLifeline {
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean(),
- any(SlowPeerReports.class));
+ any(SlowPeerReports.class),
+ any(BlocksStorageMovementResult[].class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index c94f74e..1aef383 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -220,7 +221,8 @@ public class TestDatanodeProtocolRetryPolicy {
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
- Mockito.any(SlowPeerReports.class));
+ Mockito.any(SlowPeerReports.class),
+ Mockito.any(BlocksStorageMovementResult[].class));
dn = new DataNode(conf, locations, null, null) {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index eb015c0..c6764db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -173,7 +174,8 @@ public class TestFsDatasetCache {
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
- anyBoolean(), any(SlowPeerReports.class));
+ anyBoolean(), any(SlowPeerReports.class),
+ (BlocksStorageMovementResult[]) any());
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index ea3eec3..1eb44e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -34,10 +34,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
@@ -191,12 +190,12 @@ public class TestStoragePolicySatisfyWorker {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
- List<BlockMovementResult> completedBlocks = worker
- .getBlocksMovementsCompletionHandler().getCompletedBlocks();
+ List<BlocksStorageMovementResult> completedBlocks = worker
+ .getBlocksMovementsCompletionHandler().getBlksMovementResults();
int failedCount = 0;
- for (BlockMovementResult blockMovementResult : completedBlocks) {
- if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
- blockMovementResult.getStatus()) {
+ for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
+ if (blkMovementResult.getStatus() ==
+ BlocksStorageMovementResult.Status.FAILURE) {
failedCount++;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 2b793e9..94a7908 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -108,7 +109,8 @@ public class TestStorageReport {
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
- Mockito.any(SlowPeerReports.class));
+ Mockito.any(SlowPeerReports.class),
+ Mockito.any(BlocksStorageMovementResult[].class));
StorageReport[] reports = captor.getValue();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index b86b3fb..d8e83df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -953,7 +954,8 @@ public class NNThroughputBenchmark implements Tool {
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT).getCommands();
+ SlowPeerReports.EMPTY_REPORT,
+ new BlocksStorageMovementResult[0]).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@@ -1003,7 +1005,7 @@ public class NNThroughputBenchmark implements Tool {
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT).getCommands();
+ SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 2b8faf4..43a2fa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -124,7 +125,7 @@ public class NameNodeAdapter {
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT);
+ SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
}
public static boolean setReplication(final FSNamesystem ns,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index b9161c3..69851d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -134,7 +135,8 @@ public class TestDeadDatanode {
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
- SlowPeerReports.EMPTY_REPORT).getCommands();
+ SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]).getCommands();
+
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 37664b5..cbfdfc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -146,6 +148,54 @@ public class TestStoragePolicySatisfier {
}
}
+ /**
+ * Tests to verify that the block storage movement results will be propagated
+ * to Namenode via datanode heartbeat.
+ */
+ @Test(timeout = 300000)
+ public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
+ try {
+ // Change policy to ONE_SSD
+ distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+ // Making sure SDD based nodes added to cluster. Adding SSD based
+ // datanodes.
+ startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+
+ // Wait till the block is moved to SSD areas
+ waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
+ waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
+
+ // TODO: Temporarily using the results from StoragePolicySatisfier class.
+ // This has to be revisited as part of HDFS-11029.
+ waitForBlocksMovementResult(1, 30000);
+ } finally {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ private void waitForBlocksMovementResult(int expectedResultsCount,
+ int timeout) throws TimeoutException, InterruptedException {
+ BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+ final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("expectedResultsCount={} actualResultsCount={}",
+ expectedResultsCount, sps.results.size());
+ return expectedResultsCount == sps.results.size();
+ }
+ }, 100, timeout);
+ }
+
private void writeContent(final DistributedFileSystem dfs,
final String fileName) throws IOException {
// write to DISK
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org