You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eh...@apache.org on 2019/12/02 17:11:58 UTC

[hadoop] 01/04: HDFS-13310. The DatanodeProtocol should have a DNA_BACKUP to backup blocks. Original patch contributed by Ewan Higgs. Followup work and fixed contributed by Virajith Jalaparthi.

This is an automated email from the ASF dual-hosted git repository.

ehiggs pushed a commit to branch HDFS-12090
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 4f7bd416853143d9becaa51ed0d2158b4613a988
Author: Ewan Higgs <ew...@wdc.com>
AuthorDate: Mon Jul 23 13:14:04 2018 +0200

    HDFS-13310. The DatanodeProtocol should have a DNA_BACKUP to backup blocks. Original patch contributed by Ewan Higgs. Followup work and fixed contributed by Virajith Jalaparthi.
---
 .../protocol/BlockSyncTaskExecutionFeedback.java   |  67 +++++++
 .../server/protocol/SyncTaskExecutionOutcome.java  |  25 +++
 .../server/protocol/SyncTaskExecutionResult.java   |  46 +++++
 .../DatanodeProtocolClientSideTranslatorPB.java    |   8 +-
 .../DatanodeProtocolServerSideTranslatorPB.java    |   6 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java    | 208 ++++++++++++++++++++-
 .../server/blockmanagement/DatanodeManager.java    |   4 +-
 .../hdfs/server/datanode/BPServiceActor.java       |   9 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |   8 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |   8 +-
 .../hadoop/hdfs/server/protocol/BlockSyncTask.java |  83 ++++++++
 .../protocol/BulkSyncTaskExecutionFeedback.java    |  36 ++++
 .../hdfs/server/protocol/DatanodeProtocol.java     |  20 +-
 .../hadoop/hdfs/server/protocol/SyncCommand.java   |  39 ++++
 .../src/main/proto/DatanodeProtocol.proto          |  88 ++++++++-
 .../blockmanagement/TestDatanodeManager.java       |   2 +-
 .../TestNameNodePrunesMissingStorages.java         |   2 +-
 .../server/datanode/InternalDataNodeTestUtils.java |   1 +
 .../hdfs/server/datanode/TestBPOfferService.java   |   5 +-
 .../hdfs/server/datanode/TestBlockRecovery.java    |   1 +
 .../hdfs/server/datanode/TestDataNodeLifeline.java |   9 +-
 .../datanode/TestDatanodeProtocolRetryPolicy.java  |   1 +
 .../hdfs/server/datanode/TestStorageReport.java    |   4 +-
 .../fsdataset/impl/TestFsDatasetCache.java         |   4 +-
 .../server/namenode/NNThroughputBenchmark.java     |   8 +-
 .../hdfs/server/namenode/NameNodeAdapter.java      |   5 +-
 .../hdfs/server/namenode/TestDeadDatanode.java     |   4 +-
 27 files changed, 653 insertions(+), 48 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java
new file mode 100644
index 0000000..2e5393e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.UUID;
+
+/**
+ * Feedback for a BlockSyncTask.
+ */
+public class BlockSyncTaskExecutionFeedback {
+
+  private UUID syncTaskId;
+  private SyncTaskExecutionOutcome outcome;
+  private SyncTaskExecutionResult result;
+  private String syncMountId;
+
+  public BlockSyncTaskExecutionFeedback(UUID syncTaskId,
+      SyncTaskExecutionOutcome outcome, SyncTaskExecutionResult result,
+      String syncMountId) {
+    this.syncTaskId = syncTaskId;
+    this.outcome = outcome;
+    this.result = result;
+    this.syncMountId = syncMountId;
+  }
+
+  public static BlockSyncTaskExecutionFeedback finishedSuccessfully(
+      UUID syncTaskId, String syncMountId, SyncTaskExecutionResult result) {
+    return new BlockSyncTaskExecutionFeedback(syncTaskId,
+        SyncTaskExecutionOutcome.FINISHED_SUCCESSFULLY, result, syncMountId);
+  }
+
+  public static BlockSyncTaskExecutionFeedback failedWithException(
+      UUID syncTaskId, String syncMountId, Exception e) {
+    return new BlockSyncTaskExecutionFeedback(syncTaskId,
+        SyncTaskExecutionOutcome.EXCEPTION, null, syncMountId);
+  }
+
+  public UUID getSyncTaskId() {
+    return syncTaskId;
+  }
+
+  public SyncTaskExecutionOutcome getOutcome() {
+    return outcome;
+  }
+
+  public SyncTaskExecutionResult getResult() {
+    return result;
+  }
+
+  public String getSyncMountId() {
+    return syncMountId;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java
new file mode 100644
index 0000000..492575b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * SyncTaskExecutionOutcome is whether the SyncTask was successful or not.
+ */
+public enum SyncTaskExecutionOutcome {
+  FINISHED_SUCCESSFULLY,
+  EXCEPTION
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java
new file mode 100644
index 0000000..b623dc5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Result of a SyncTask.
+ */
+public class SyncTaskExecutionResult {
+
+  /** result is the opaque byte stream result of a task. e.g. PartHandle */
+  private ByteBuffer result;
+  private Long numberOfBytes;
+
+  public SyncTaskExecutionResult(ByteBuffer result, Long numberOfBytes) {
+    this.result = result;
+    this.numberOfBytes = numberOfBytes;
+  }
+
+  public static SyncTaskExecutionResult emptyResult() {
+    return new SyncTaskExecutionResult(ByteBuffer.wrap(new byte[0]), 0L);
+  }
+
+  public ByteBuffer getResult() {
+    return result;
+  }
+
+  public Long getNumberOfBytes() {
+    return numberOfBytes;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index e4125dc..20b314c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -138,8 +139,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks)
-          throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback feedback) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -162,6 +163,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     if (slowDisks.haveSlowDisks()) {
       builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
     }
+    if(feedback != null && !feedback.getFeedbacks().isEmpty()) {
+      builder.setBulkSyncTaskExecutionFeedback(PBHelper.convert(feedback));
+    }
 
     HeartbeatResponseProto resp;
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 5cba284..a51ce85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -109,6 +109,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
   @Override
   public HeartbeatResponseProto sendHeartbeat(RpcController controller,
       HeartbeatRequestProto request) throws ServiceException {
+
     HeartbeatResponse response;
     try {
       final StorageReport[] report = PBHelperClient.convertStorageReports(
@@ -122,7 +123,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
-          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
+          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
+          PBHelper.convertBulkSyncTaskExecutionFeedback(
+              request.getBulkSyncTaskExecutionFeedback())
+      );
     } catch (IOException e) {
       throw new ServiceException(e);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index baec6fa..a62a00d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -24,9 +27,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 import com.google.protobuf.ByteString;
-
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -43,38 +47,44 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommand
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockSyncTaskProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BulkSyncTaskExecutionFeedbackProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos
-    .SlowDiskReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowDiskReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionFeedbackProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionOutcomeProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionResultProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointSignatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.ExportedBlockKeysProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto.NamenodeRoleProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamespaceInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RecoveringBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogManifestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogProto;
@@ -89,18 +99,23 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionOutcome;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -119,7 +134,9 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
+import org.apache.hadoop.hdfs.server.protocol.SyncCommand;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.ipc.ClientId;
 
 /**
  * Utilities for converting protobuf classes to and from implementation classes
@@ -469,11 +486,52 @@ public class PBHelper {
       return PBHelper.convert(proto.getBlkIdCmd());
     case BlockECReconstructionCommand:
       return PBHelper.convert(proto.getBlkECReconstructionCmd());
+    case SyncCommand:
+      return PBHelper.convert(proto.getSyncCommand());
     default:
       return null;
     }
   }
-  
+
+  private static SyncCommand convert(SyncCommandProto backupCommand) {
+    List<BlockSyncTaskProto> syncTasksProtoList =
+        backupCommand.getSyncTasksList();
+    List<BlockSyncTask> syncTasksList =
+        new ArrayList(syncTasksProtoList.size());
+    for (BlockSyncTaskProto syncTaskProto : syncTasksProtoList) {
+      syncTasksList.add(convertSyncTask(syncTaskProto));
+    }
+
+    return new SyncCommand(DatanodeProtocol.DNA_BACKUP, syncTasksList);
+  }
+
+  private static BlockSyncTask convertSyncTask(
+      BlockSyncTaskProto syncTaskProto) {
+    SyncTaskIdProto syncTaskIdProto = syncTaskProto.getSyncTaskId();
+    UUID syncTaskId = convert(syncTaskIdProto);
+    try {
+      return new BlockSyncTask(syncTaskId,
+          new URI(syncTaskProto.getUri()),
+          PBHelperClient.convertLocatedBlocks(
+              syncTaskProto.getLocatedBlocksList()),
+          syncTaskProto.getPartNumber(),
+          syncTaskProto.getUploadHandle().toByteArray(),
+          syncTaskProto.getOffset(),
+          syncTaskProto.getLength(),
+          syncTaskIdProto.getSyncMountId());
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  public static UUID convert(SyncTaskIdProto syncTaskIdProto) {
+    byte[] clientId = syncTaskIdProto.getSyncTaskId().toByteArray();
+    long syncTaskIdMsb = ClientId.getMsb(clientId);
+    long syncTaskIdLsb = ClientId.getLsb(clientId);
+    return new UUID(syncTaskIdMsb, syncTaskIdLsb);
+  }
+
+
   public static BalancerBandwidthCommandProto convert(
       BalancerBandwidthCommand bbCmd) {
     return BalancerBandwidthCommandProto.newBuilder()
@@ -603,6 +661,10 @@ public class PBHelper {
           .setBlkECReconstructionCmd(
               convert((BlockECReconstructionCommand) datanodeCommand));
       break;
+    case DatanodeProtocol.DNA_BACKUP:
+      builder.setCmdType(DatanodeCommandProto.Type.SyncCommand)
+          .setSyncCommand(convert((SyncCommand) datanodeCommand));
+      break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
       builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -1128,4 +1190,130 @@ public class PBHelper {
 
     return new FileRegion(block, providedStorageLocation);
   }
+
+  private static SyncCommandProto convert(SyncCommand syncCommand) {
+    SyncCommandProto.Builder builder = SyncCommandProto.newBuilder();
+
+    List<BlockSyncTaskProto> syncTaskProtos = syncCommand.getSyncTasks()
+        .stream()
+        .map(syncTask -> convert(syncTask))
+        .collect(Collectors.toList());
+
+    builder.addAllSyncTasks(syncTaskProtos);
+
+    return builder.build();
+  }
+
+  private static BlockSyncTaskProto convert(BlockSyncTask blockSyncTask) {
+    BlockSyncTaskProto.Builder builder = BlockSyncTaskProto.newBuilder();
+    builder.addAllLocatedBlocks(
+        PBHelperClient.convertLocatedBlocks2(blockSyncTask.getLocatedBlocks()));
+    builder.setUploadHandle(
+        ByteString.copyFrom(blockSyncTask.getUploadHandle()));
+    builder.setPartNumber(blockSyncTask.getPartNumber());
+    builder.setUri(blockSyncTask.getRemoteURI().toString());
+    builder.setOffset(blockSyncTask.getOffset());
+    builder.setLength(blockSyncTask.getLength());
+
+    return builder.build();
+  }
+
+  public static SyncTaskIdProto convert(UUID syncTaskId, String syncMountId) {
+    SyncTaskIdProto.Builder builder = SyncTaskIdProto.newBuilder();
+    ByteBuffer syncTaskIdBytes = ByteBuffer.wrap(new byte[16]);
+    syncTaskIdBytes.putLong(syncTaskId.getMostSignificantBits());
+    syncTaskIdBytes.putLong(syncTaskId.getLeastSignificantBits());
+    builder.setSyncTaskId(ByteString.copyFrom(syncTaskIdBytes.array()));
+    builder.setSyncMountId(syncMountId);
+    return builder.build();
+  }
+
+
+  public static BulkSyncTaskExecutionFeedbackProto convert(
+      BulkSyncTaskExecutionFeedback bulkFeedback) {
+    return BulkSyncTaskExecutionFeedbackProto.newBuilder()
+        .addAllFeedbacks(bulkFeedback.getFeedbacks().stream()
+            .map(f -> convert(f)).collect(Collectors.toList()))
+        .build();
+  }
+
+  public static SyncTaskExecutionFeedbackProto convert(
+      BlockSyncTaskExecutionFeedback feedback) {
+    SyncTaskExecutionFeedbackProto.Builder builder =
+        SyncTaskExecutionFeedbackProto.newBuilder()
+            .setSyncTaskId(
+                convert(feedback.getSyncTaskId(), feedback.getSyncMountId()))
+            .setOutcome(convert(feedback.getOutcome()));
+    if (feedback.getResult() != null) {
+      builder.setResult(convert(feedback.getResult()));
+    }
+    return builder.build();
+  }
+
+  public static SyncTaskExecutionOutcomeProto convert(
+      SyncTaskExecutionOutcome outcome) {
+    switch (outcome) {
+    case FINISHED_SUCCESSFULLY:
+      return SyncTaskExecutionOutcomeProto.FINISHED_SUCCESSFULLY;
+    case EXCEPTION:
+      return SyncTaskExecutionOutcomeProto.EXCEPTION;
+    default:
+      throw new IllegalArgumentException(
+          "Unknown SyncTaskExecutionOutcome: " + outcome);
+    }
+  }
+
+  public static SyncTaskExecutionResultProto convert(
+      SyncTaskExecutionResult result) {
+    SyncTaskExecutionResultProto.Builder builder =
+        SyncTaskExecutionResultProto.newBuilder();
+    if (result.getResult() != null) {
+      builder.setResult(ByteString.copyFrom(result.getResult()));
+    }
+    if (result.getNumberOfBytes() != null) {
+      builder.setNumberOfBytes(result.getNumberOfBytes());
+    }
+    return builder.build();
+  }
+
+  public static BulkSyncTaskExecutionFeedback convertBulkSyncTaskExecutionFeedback(
+      BulkSyncTaskExecutionFeedbackProto bulkSyncTaskExecutionFeedback) {
+    return new BulkSyncTaskExecutionFeedback(
+        bulkSyncTaskExecutionFeedback.getFeedbacksList().stream()
+            .map(feedback -> convert(feedback)).collect(Collectors.toList()));
+  }
+
+  public static BlockSyncTaskExecutionFeedback convert(
+      SyncTaskExecutionFeedbackProto feedback) {
+    return new BlockSyncTaskExecutionFeedback(convert(feedback.getSyncTaskId()),
+        convert(feedback.getOutcome()),
+        feedback.hasResult() ? convert(feedback.getResult()) : null,
+        feedback.getSyncTaskId().getSyncMountId());
+  }
+
+  public static SyncTaskExecutionOutcome convert(
+      SyncTaskExecutionOutcomeProto outcome) {
+    switch (outcome) {
+    case FINISHED_SUCCESSFULLY:
+      return SyncTaskExecutionOutcome.FINISHED_SUCCESSFULLY;
+    case EXCEPTION:
+      return SyncTaskExecutionOutcome.EXCEPTION;
+    default:
+      throw new IllegalArgumentException(
+          "Unknown SyncTaskExecutionOutcomeProto: " + outcome);
+    }
+  }
+
+  public static SyncTaskExecutionResult convert(
+      SyncTaskExecutionResultProto result) {
+    byte[] bytes = null;
+    if (result.getResult() != null) {
+      bytes = result.getResult().toByteArray();
+    }
+
+    ByteBuffer byteBuffer =
+        (bytes == null) ? null : ByteBuffer.wrap(bytes).asReadOnlyBuffer();
+    return new SyncTaskExecutionResult(byteBuffer, result.getNumberOfBytes());
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 8adb03d..1c6a1f0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1660,7 +1660,9 @@ public class DatanodeManager {
       int maxTransfers, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
+      throws IOException {
     final DatanodeDescriptor nodeinfo;
     try {
       nodeinfo = getDatanode(nodeReg);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 495035e..0d2f514 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -534,6 +535,10 @@ class BPServiceActor implements Runnable {
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
             SlowDiskReports.EMPTY_REPORT;
 
+    // TODO - collect feedback from SyncTasks here.
+    BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback =
+        new BulkSyncTaskExecutionFeedback(Collections.emptyList());
+
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -544,13 +549,13 @@ class BPServiceActor implements Runnable {
         volumeFailureSummary,
         requestBlockReportLease,
         slowPeers,
-        slowDisks);
+        slowDisks,
+        bulkSyncTaskExecutionFeedback);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
       scheduler.scheduleNextOutlierReport();
     }
-
     return response;
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 8d1884e..d78d940 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -107,6 +107,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
 import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.util.Time;
 import static org.apache.hadoop.util.Time.now;
@@ -3988,8 +3989,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks)
-          throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
+      throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3998,7 +4000,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
           xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
-          slowPeers, slowDisks);
+          slowPeers, slowDisks, bulkSyncTaskExecutionFeedback);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e483961..c6e00de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -160,6 +160,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1564,14 +1565,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks)
-          throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
+      throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers, slowDisks);
+        slowPeers, slowDisks, bulkSyncTaskExecutionFeedback);
   }
 
   @Override // DatanodeProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java
new file mode 100644
index 0000000..875a409
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A BlockSyncTask is an operation that is sent to the datanodes to copy
+ * blocks to an external storage endpoint as a part of an orchestrated
+ * synchronization across multiple datanodes.
+ * BlockSyncTask is intended to be an immutable POJO.
+ */
+public class BlockSyncTask {
+  private final UUID syncTaskId;
+  private final URI remoteURI;
+  private final List<LocatedBlock> locatedBlocks;
+  private String syncMountId;
+  private final int partNumber;
+  private byte[] uploadHandle;
+  private final int offset;
+  private final long length;
+
+  public BlockSyncTask(UUID syncTaskId, URI remoteURI,
+      List<LocatedBlock> locatedBlocks, Integer partNumber, byte[] uploadHandle,
+      int offset, long length, String syncMountId) {
+    this.syncTaskId = syncTaskId;
+    this.remoteURI = remoteURI;
+    this.locatedBlocks = locatedBlocks;
+    this.syncMountId = syncMountId;
+    this.partNumber = partNumber;
+    this.uploadHandle = uploadHandle;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public int getPartNumber() {
+    return partNumber;
+  }
+
+  public byte[] getUploadHandle() {
+    return uploadHandle;
+  }
+
+  public int getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public UUID getSyncTaskId() {
+    return syncTaskId;
+  }
+
+  public URI getRemoteURI() {
+    return remoteURI;
+  }
+
+  public List<LocatedBlock> getLocatedBlocks() {
+    return locatedBlocks;
+  }
+
+  public String getSyncMountId() {
+    return syncMountId;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java
new file mode 100644
index 0000000..0d459e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.Collection;
+
+/**
+ * Feedback for a collection of {@link BlockSyncTask}s.
+ */
+public class BulkSyncTaskExecutionFeedback {
+
+  private Collection<BlockSyncTaskExecutionFeedback> feedbacks;
+
+  public BulkSyncTaskExecutionFeedback(
+      Collection<BlockSyncTaskExecutionFeedback> feedbacks) {
+    this.feedbacks = feedbacks;
+  }
+
+  public Collection<BlockSyncTaskExecutionFeedback> getFeedbacks() {
+    return feedbacks;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 5680ef3..692a2e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -81,6 +81,7 @@ public interface DatanodeProtocol {
   final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
   int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
   int DNA_DROP_SPS_WORK_COMMAND = 13; // drop sps work command
+  final static int DNA_BACKUP = 14; // back up data to PROVIDED stores.
 
   /** 
    * Register Datanode.
@@ -112,20 +113,19 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
+   * @param bulkSyncTaskExecutionFeedback Result of the execution of the
+   *                                      sync tasks.
    * @throws IOException on error
    */
   @Idempotent
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
-                                       StorageReport[] reports,
-                                       long dnCacheCapacity,
-                                       long dnCacheUsed,
-                                       int xmitsInProgress,
-                                       int xceiverCount,
-                                       int failedVolumes,
-                                       VolumeFailureSummary volumeFailureSummary,
-                                       boolean requestFullBlockReportLease,
-                                       @Nonnull SlowPeerReports slowPeers,
-                                       @Nonnull SlowDiskReports slowDisks)
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary,
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers,
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
       throws IOException;
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java
new file mode 100644
index 0000000..7e2c242
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.List;
+
+/**
+ * A SyncCommand is an instruction to a DataNode to move the
+ * give file to specified target DataNodes provided storage.
+ */
+public class SyncCommand extends DatanodeCommand {
+
+  private final List<BlockSyncTask> syncTasks;
+
+  public SyncCommand(int action, List<BlockSyncTask> syncTasks) {
+    super(action);
+    this.syncTasks = syncTasks;
+  }
+
+  public List<BlockSyncTask> getSyncTasks() {
+    return syncTasks;
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 0e24130..66df8ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -60,6 +60,7 @@ message DatanodeCommandProto {
     NullDatanodeCommand = 7;
     BlockIdCommand = 8;
     BlockECReconstructionCommand = 9;
+    SyncCommand = 10;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -74,6 +75,7 @@ message DatanodeCommandProto {
   optional RegisterCommandProto registerCmd = 7;
   optional BlockIdCommandProto blkIdCmd = 8;
   optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
+  optional SyncCommandProto syncCommand = 10;
 }
 
 /**
@@ -154,6 +156,89 @@ message BlockECReconstructionCommandProto {
   repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
 }
 
+message SyncTaskIdProto {
+  required bytes syncTaskId = 1;
+  required string syncMountId = 2;
+}
+
+/**
+ * Instruct the datanode to perform a backup command
+ */
+message SyncCommandProto {
+  repeated BlockSyncTaskProto syncTasks = 1;
+}
+
+/**
+ * A block synchronization task as part of an orchestrated synchronization
+ * across potentially multiple datanodes (i.e. multipart put part).
+ */
+message BlockSyncTaskProto {
+  required SyncTaskIdProto syncTaskId = 1;
+
+  required bytes uploadHandle = 2;
+  required int32 partNumber = 3;
+  repeated LocatedBlockProto locatedBlocks = 4;
+  required string uri = 5;
+  required int32 offset = 6;
+  required int64 length = 7;
+}
+
+/**
+ * Block storage movement information
+ */
+message BlockMovingInfoProto {
+  required BlockProto block = 1;
+  required DatanodeInfoProto sourceDnInfo = 2;
+  required DatanodeInfoProto targetDnInfo = 3;
+  required StorageTypeProto sourceStorageType = 4;
+  required StorageTypeProto targetStorageType = 5;
+}
+
+/**
+ * Blocks for which storage movements has been attempted and finished
+ * with either success or failure.
+ */
+message BlocksStorageMoveAttemptFinishedProto {
+  repeated BlockProto blocks = 1;
+}
+
+/**
+ * A collection of feedbacks for a collection of sync tasks.
+ */
+message BulkSyncTaskExecutionFeedbackProto {
+  repeated SyncTaskExecutionFeedbackProto feedbacks = 1;
+}
+
+/**
+ * Feedback for a sync task that has been executed.
+ * syncTaskId - identifier for the task.
+ * outcome - success/error.
+ * operation - the type of operation.
+ * result - if the outcome is successful, the results of the sync task.
+ */
+message SyncTaskExecutionFeedbackProto {
+  required SyncTaskIdProto syncTaskId = 1;
+  required SyncTaskExecutionOutcomeProto outcome = 2;
+  optional SyncTaskExecutionResultProto result = 3;
+}
+
+/**
+ * Success of failure indication of a sync task.
+ */
+enum SyncTaskExecutionOutcomeProto {
+  FINISHED_SUCCESSFULLY = 0;
+  EXCEPTION = 1;
+}
+
+/**
+ * result - the opaque result data from the sync task.
+ * numberOfBytes - the number of bytes copied.
+ */
+message SyncTaskExecutionResultProto {
+  optional bytes result = 1;
+  optional int64 numberOfBytes = 2;
+}
+
 /**
  * registration - Information of the datanode registering with the namenode
  */
@@ -211,6 +296,7 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
+  optional BulkSyncTaskExecutionFeedbackProto bulkSyncTaskExecutionFeedback = 12;
 }
 
 /**
@@ -276,7 +362,7 @@ message StorageBlockReportProto {
  */
 message BlockReportResponseProto {
   optional DatanodeCommandProto cmd = 1;
-} 
+}
 
 /**
  * registration - datanode registration information
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index e8e6b94..6d24321 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -737,7 +737,7 @@ public class TestDatanodeManager {
     Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
     DatanodeCommand[] cmds = dm.handleHeartbeat(
         dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, null);
 
     long expectedNumCmds = Arrays.stream(
         new int[]{numReplicationTasks, numECTasks})
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index f64767a..d8e53ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -117,7 +117,7 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT);
+          SlowDiskReports.EMPTY_REPORT, null);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index 97302b5..5586f89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -163,6 +163,7 @@ public class InternalDataNodeTestUtils {
             Mockito.anyInt(), Mockito.any(),
             Mockito.anyBoolean(),
             Mockito.any(),
+            Mockito.any(),
             Mockito.any())).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 2353992..26c07d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -160,7 +162,8 @@ public class TestBPOfferService {
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
-          Mockito.any(SlowDiskReports.class));
+          Mockito.any(SlowDiskReports.class),
+          Mockito.any(BulkSyncTaskExecutionFeedback.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index f12285c..fb8768d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -228,6 +228,7 @@ public class TestBlockRecovery {
             Mockito.any(),
             Mockito.anyBoolean(),
             Mockito.any(),
+            Mockito.any(),
             Mockito.any()))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index dcfbf02..7c62edc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -25,13 +25,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
 
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyLong;
@@ -172,7 +173,8 @@ public class TestDataNodeLifeline {
             any(),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class));
+            any(SlowDiskReports.class),
+            any(BulkSyncTaskExecutionFeedback.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -241,7 +243,8 @@ public class TestDataNodeLifeline {
             any(),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class));
+            any(SlowDiskReports.class),
+            any(BulkSyncTaskExecutionFeedback.class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index 93f9f44..bb4c125 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -216,6 +216,7 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.any(),
            Mockito.anyBoolean(),
            Mockito.any(),
+           Mockito.any(),
            Mockito.any());
 
     dn = new DataNode(conf, locations, null, null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 9df6209..875cef7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -110,7 +111,8 @@ public class TestStorageReport {
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         any(), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
-        Mockito.any(SlowDiskReports.class));
+        Mockito.any(SlowDiskReports.class),
+        Mockito.any(BulkSyncTaskExecutionFeedback.class));
 
     StorageReport[] reports = captor.getValue();
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
index ae42bd8..0689095 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetCache.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -210,7 +211,8 @@ public class TestFsDatasetCache {
           (StorageReport[]) any(), anyLong(), anyLong(),
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
-          any(SlowDiskReports.class));
+          any(SlowDiskReports.class),
+          any(BulkSyncTaskExecutionFeedback.class));
     } finally {
       lock.writeLock().unlock();
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index f501673..6cf3aa6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -964,8 +964,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-          .getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+          null).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1015,8 +1015,8 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-          .getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+          null).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index f1b26ab..1c672cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.reflect.FieldUtils;
@@ -136,7 +138,8 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+        new BulkSyncTaskExecutionFeedback(new ArrayList<>()));
   }
 
   public static boolean setReplication(final FSNamesystem ns,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 46b8db1..6212292 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -140,8 +140,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-        .getCommands();
+            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+            null).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org