You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/02/05 20:03:52 UTC

hadoop git commit: HDFS-7270. Add congestion signaling capability to DataNode write protocol. Contributed by Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 84df660af -> 67002a5fc


HDFS-7270. Add congestion signaling capability to DataNode write protocol. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67002a5f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67002a5f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67002a5f

Branch: refs/heads/branch-2
Commit: 67002a5fc07e2f91c3967801ca532988130660a4
Parents: 84df660
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Feb 5 10:58:58 2015 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Feb 5 11:03:43 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  2 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |  3 +-
 .../hdfs/protocol/datatransfer/PipelineAck.java | 98 +++++++++++++++++---
 .../hdfs/server/datanode/BlockReceiver.java     | 49 +++++-----
 .../hadoop/hdfs/server/datanode/DataNode.java   | 19 ++++
 .../src/main/proto/datatransfer.proto           |  3 +-
 .../hadoop/hdfs/TestDataTransferProtocol.java   | 10 +-
 8 files changed, 147 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 34d8018..038fbdd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -293,6 +293,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7712. Switch blockStateChangeLog to use slf4j. (wang)
 
+    HDFS-7270. Add congestion signaling capability to DataNode write protocol.
+    (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 666ec04..36d1a77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -767,4 +767,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String NNTOP_WINDOWS_MINUTES_KEY =
       "dfs.namenode.top.windows.minutes";
   public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
+  public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
+  public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 58995db..601560c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -891,7 +891,8 @@ public class DFSOutputStream extends FSOutputSummer
             long seqno = ack.getSeqno();
             // processes response status from datanodes.
             for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
-              final Status reply = ack.getReply(i);
+              final Status reply = PipelineAck.getStatusFromHeader(ack
+                .getReply(i));
               // Restart will not be treated differently unless it is
               // the local node or the only one in the pipeline.
               if (PipelineAck.isRestartOOBStatus(reply) &&

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index 6d40653..35e5bb8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -22,17 +22,21 @@ import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import com.google.protobuf.TextFormat;
+import org.apache.hadoop.hdfs.util.LongBitFormat;
 
 /** Pipeline Acknowledgment **/
 @InterfaceAudience.Private
@@ -46,6 +50,55 @@ public class PipelineAck {
   // place holder for timeout value of each OOB type
   final static long[] OOB_TIMEOUT;
 
+  public enum ECN {
+    DISABLED(0),
+    SUPPORTED(1),
+    SUPPORTED2(2),
+    CONGESTED(3);
+
+    private final int value;
+    private static final ECN[] VALUES = values();
+    static ECN valueOf(int value) {
+      return VALUES[value];
+    }
+
+    ECN(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return value;
+    }
+  }
+
+  private enum StatusFormat {
+    STATUS(null, 4),
+    RESERVED(STATUS.BITS, 1),
+    ECN_BITS(RESERVED.BITS, 2);
+
+    private final LongBitFormat BITS;
+
+    StatusFormat(LongBitFormat prev, int bits) {
+      BITS = new LongBitFormat(name(), prev, bits, 0);
+    }
+
+    static Status getStatus(int header) {
+      return Status.valueOf((int) STATUS.BITS.retrieve(header));
+    }
+
+    static ECN getECN(int header) {
+      return ECN.valueOf((int) ECN_BITS.BITS.retrieve(header));
+    }
+
+    public static int setStatus(int old, Status status) {
+      return (int) STATUS.BITS.combine(status.getNumber(), old);
+    }
+
+    public static int setECN(int old, ECN ecn) {
+      return (int) ECN_BITS.BITS.combine(ecn.getValue(), old);
+    }
+  }
+
   static {
     OOB_TIMEOUT = new long[NUM_OOB_TYPES];
     HdfsConfiguration conf = new HdfsConfiguration();
@@ -65,7 +118,7 @@ public class PipelineAck {
    * @param seqno sequence number
    * @param replies an array of replies
    */
-  public PipelineAck(long seqno, Status[] replies) {
+  public PipelineAck(long seqno, int[] replies) {
     this(seqno, replies, 0L);
   }
 
@@ -75,10 +128,15 @@ public class PipelineAck {
    * @param replies an array of replies
    * @param downstreamAckTimeNanos ack RTT in nanoseconds, 0 if no next DN in pipeline
    */
-  public PipelineAck(long seqno, Status[] replies, long downstreamAckTimeNanos) {
+  public PipelineAck(long seqno, int[] replies,
+                     long downstreamAckTimeNanos) {
+    ArrayList<Integer> replyList = Lists.newArrayList();
+    for (int r : replies) {
+      replyList.add(r);
+    }
     proto = PipelineAckProto.newBuilder()
       .setSeqno(seqno)
-      .addAllStatus(Arrays.asList(replies))
+      .addAllReply(replyList)
       .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
       .build();
   }
@@ -96,15 +154,15 @@ public class PipelineAck {
    * @return the number of replies
    */
   public short getNumOfReplies() {
-    return (short)proto.getStatusCount();
+    return (short)proto.getReplyCount();
   }
   
   /**
    * get the ith reply
    * @return the the ith reply
    */
-  public Status getReply(int i) {
-    return proto.getStatus(i);
+  public int getReply(int i) {
+    return proto.getReply(i);
   }
 
   /**
@@ -120,8 +178,8 @@ public class PipelineAck {
    * @return true if all statuses are SUCCESS
    */
   public boolean isSuccess() {
-    for (Status reply : proto.getStatusList()) {
-      if (reply != Status.SUCCESS) {
+    for (int reply : proto.getReplyList()) {
+      if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
         return false;
       }
     }
@@ -138,11 +196,12 @@ public class PipelineAck {
     if (getSeqno() != UNKOWN_SEQNO) {
       return null;
     }
-    for (Status reply : proto.getStatusList()) {
+    for (int reply : proto.getReplyList()) {
       // The following check is valid because protobuf guarantees to
       // preserve the ordering of enum elements.
-      if (reply.getNumber() >= OOB_START && reply.getNumber() <= OOB_END) {
-        return reply;
+      Status s = StatusFormat.getStatus(reply);
+      if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
+        return s;
       }
     }
     return null;
@@ -184,4 +243,19 @@ public class PipelineAck {
   public String toString() {
     return TextFormat.shortDebugString(proto);
   }
+
+  public static Status getStatusFromHeader(int header) {
+    return StatusFormat.getStatus(header);
+  }
+
+  public static int setStatusForHeader(int old, Status status) {
+    return StatusFormat.setStatus(old, status);
+  }
+
+  public static int combineHeader(ECN ecn, Status status) {
+    int header = 0;
+    header = StatusFormat.setStatus(header, status);
+    header = StatusFormat.setECN(header, ecn);
+    return header;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 6e37c23..cfd2442 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -986,9 +986,7 @@ class BlockReceiver implements Closeable {
   private static enum PacketResponderType {
     NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
   }
-  
-  private static final Status[] MIRROR_ERROR_STATUS = {Status.SUCCESS, Status.ERROR};
-  
+
   /**
    * Processes responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
@@ -1092,7 +1090,7 @@ class BlockReceiver implements Closeable {
       LOG.info("Sending an out of band ack of type " + ackStatus);
       try {
         sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
-            ackStatus);
+            PipelineAck.combineHeader(datanode.getECN(), ackStatus));
       } finally {
         // Let others send ack. Unless there are miltiple OOB send
         // calls, there can be only one waiter, the responder thread.
@@ -1175,7 +1173,8 @@ class BlockReceiver implements Closeable {
               if (oobStatus != null) {
                 LOG.info("Relaying an out of band ack of type " + oobStatus);
                 sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
-                    Status.SUCCESS);
+                    PipelineAck.combineHeader(datanode.getECN(),
+                      Status.SUCCESS));
                 continue;
               }
               seqno = ack.getSeqno();
@@ -1249,9 +1248,10 @@ class BlockReceiver implements Closeable {
             finalizeBlock(startTime);
           }
 
+          Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS;
           sendAckUpstream(ack, expected, totalAckTimeNanos,
-              (pkt != null ? pkt.offsetInBlock : 0), 
-              (pkt != null ? pkt.ackStatus : Status.SUCCESS));
+            (pkt != null ? pkt.offsetInBlock : 0),
+            PipelineAck.combineHeader(datanode.getECN(), myStatus));
           if (pkt != null) {
             // remove the packet from the ack queue
             removeAckHead();
@@ -1311,11 +1311,11 @@ class BlockReceiver implements Closeable {
      * @param totalAckTimeNanos total ack time including all the downstream
      *          nodes
      * @param offsetInBlock offset in block for the data in packet
-     * @param myStatus the local ack status
+     * @param myHeader the local ack header
      */
     private void sendAckUpstream(PipelineAck ack, long seqno,
         long totalAckTimeNanos, long offsetInBlock,
-        Status myStatus) throws IOException {
+        int myHeader) throws IOException {
       try {
         // Wait for other sender to finish. Unless there is an OOB being sent,
         // the responder won't have to wait.
@@ -1329,7 +1329,7 @@ class BlockReceiver implements Closeable {
         try {
           if (!running) return;
           sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
-              offsetInBlock, myStatus);
+              offsetInBlock, myHeader);
         } finally {
           synchronized(this) {
             sending = false;
@@ -1349,32 +1349,34 @@ class BlockReceiver implements Closeable {
      * @param totalAckTimeNanos total ack time including all the downstream
      *          nodes
      * @param offsetInBlock offset in block for the data in packet
-     * @param myStatus the local ack status
+     * @param myHeader the local ack header
      */
     private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
-        long totalAckTimeNanos, long offsetInBlock, Status myStatus)
+        long totalAckTimeNanos, long offsetInBlock, int myHeader)
         throws IOException {
-      Status[] replies = null;
+      final int[] replies;
       if (ack == null) {
         // A new OOB response is being sent from this node. Regardless of
         // downstream nodes, reply should contain one reply.
-        replies = new Status[1];
-        replies[0] = myStatus;
+        replies = new int[] { myHeader };
       } else if (mirrorError) { // ack read error
-        replies = MIRROR_ERROR_STATUS;
+        int h = PipelineAck.combineHeader(datanode.getECN(), Status.SUCCESS);
+        int h1 = PipelineAck.combineHeader(datanode.getECN(), Status.ERROR);
+        replies = new int[] {h, h1};
       } else {
         short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
             .getNumOfReplies();
-        replies = new Status[1 + ackLen];
-        replies[0] = myStatus;
-        for (int i = 0; i < ackLen; i++) {
+        replies = new int[ackLen + 1];
+        replies[0] = myHeader;
+        for (int i = 0; i < ackLen; ++i) {
           replies[i + 1] = ack.getReply(i);
         }
         // If the mirror has reported that it received a corrupt packet,
-        // do self-destruct to mark myself bad, instead of making the 
+        // do self-destruct to mark myself bad, instead of making the
         // mirror node bad. The mirror is guaranteed to be good without
         // corrupt data on disk.
-        if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
+        if (ackLen > 0 && PipelineAck.getStatusFromHeader(replies[1]) ==
+          Status.ERROR_CHECKSUM) {
           throw new IOException("Shutting down writer and responder "
               + "since the down streams reported the data sent by this "
               + "thread is corrupt");
@@ -1400,7 +1402,8 @@ class BlockReceiver implements Closeable {
       }
 
       // If a corruption was detected in the received data, terminate after
-      // sending ERROR_CHECKSUM back. 
+      // sending ERROR_CHECKSUM back.
+      Status myStatus = PipelineAck.getStatusFromHeader(myHeader);
       if (myStatus == Status.ERROR_CHECKSUM) {
         throw new IOException("Shutting down writer and responder "
             + "due to a checksum error in received data. The error "
@@ -1420,7 +1423,7 @@ class BlockReceiver implements Closeable {
       }
     }
   }
-  
+
   /**
    * This information is cached by the Datanode in the ackQueue.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index dc6734c..b872bbb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
@@ -336,6 +337,7 @@ public class DataNode extends ReconfigurableBase
   private Configuration conf;
   private final String confVersion;
   private final long maxNumberOfBlocksToLog;
+  private final boolean pipelineSupportECN;
 
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
@@ -368,6 +370,7 @@ public class DataNode extends ReconfigurableBase
     this.usersWithLocalPathAccess = null;
     this.connectToDnViaHostname = false;
     this.getHdfsBlockLocationsEnabled = false;
+    this.pipelineSupportECN = false;
   }
 
   /**
@@ -395,6 +398,9 @@ public class DataNode extends ReconfigurableBase
     this.isPermissionEnabled = conf.getBoolean(
         DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
         DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
+    this.pipelineSupportECN = conf.getBoolean(
+        DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
+        DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
 
     confVersion = "core-" +
         conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
@@ -470,6 +476,19 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
+   * The ECN bit for the DataNode. The DataNode should return:
+   * <ul>
+   *   <li>ECN.DISABLED when ECN is disabled.</li>
+   *   <li>ECN.SUPPORTED when ECN is enabled but the DN still has capacity.</li>
+   *   <li>ECN.CONGESTED when ECN is enabled and the DN is congested.</li>
+   * </ul>
+   */
+  public PipelineAck.ECN getECN() {
+    return pipelineSupportECN ? PipelineAck.ECN.SUPPORTED : PipelineAck.ECN
+      .DISABLED;
+  }
+
+  /**
    * Contains the StorageLocations for changed data volumes.
    */
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 4bd7bda..9512688 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -210,6 +210,7 @@ message PacketHeaderProto {
   optional bool syncBlock = 5 [default = false];
 }
 
+// Status is a 4-bit enum
 enum Status {
   SUCCESS = 0;
   ERROR = 1;
@@ -228,7 +229,7 @@ enum Status {
 
 message PipelineAckProto {
   required sint64 seqno = 1;
-  repeated Status status = 2;
+  repeated uint32 reply = 2;
   optional uint64 downstreamAckTimeNanos = 3 [default = 0];
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67002a5f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 6405b5a..fd4f1a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -168,7 +168,9 @@ public class TestDataTransferProtocol {
 
     //ok finally write a block with 0 len
     sendResponse(Status.SUCCESS, "", null, recvOut);
-    new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
+    new PipelineAck(100, new int[] {PipelineAck.combineHeader
+      (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write
+      (recvOut);
     sendRecvData(description, false);
   }
   
@@ -399,7 +401,8 @@ public class TestDataTransferProtocol {
     hdr.write(sendOut);
 
     sendResponse(Status.SUCCESS, "", null, recvOut);
-    new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
+    new PipelineAck(100, new int[] {PipelineAck.combineHeader
+      (PipelineAck.ECN.DISABLED, Status.ERROR)}).write(recvOut);
     sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
                  true);
 
@@ -420,7 +423,8 @@ public class TestDataTransferProtocol {
     sendOut.flush();
     //ok finally write a block with 0 len
     sendResponse(Status.SUCCESS, "", null, recvOut);
-    new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
+    new PipelineAck(100, new int[] {PipelineAck.combineHeader
+      (PipelineAck.ECN.DISABLED, Status.SUCCESS)}).write(recvOut);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */