You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/03/30 20:59:30 UTC

hadoop git commit: HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. Contributed by Anu Engineer and Haohui Mai.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 24d879026 -> dd5b2dac5


HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. Contributed by Anu Engineer and Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd5b2dac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd5b2dac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd5b2dac

Branch: refs/heads/branch-2
Commit: dd5b2dac5a81952f579906ddd1c95a2e915b513e
Parents: 24d8790
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Mar 30 11:59:21 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Mar 30 11:59:32 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 ++
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  2 +-
 .../hdfs/protocol/datatransfer/PipelineAck.java | 31 +++++++++++++-------
 .../hdfs/server/datanode/BlockReceiver.java     |  2 +-
 .../src/main/proto/datatransfer.proto           |  3 +-
 .../hadoop/hdfs/TestDataTransferProtocol.java   | 31 ++++++++++++++++++++
 6 files changed, 58 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b3cc6b7..667aa05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1009,6 +1009,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7963. Fix expected tracing spans in TestTracing along with HDFS-7054.
     (Masatake Iwasaki via kihwal)
 
+    HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck.
+    (Anu Engineer and Haohui Mai via wheat9)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6047825..9c437ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -817,7 +817,7 @@ class DataStreamer extends Daemon {
           // processes response status from datanodes.
           for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
             final Status reply = PipelineAck.getStatusFromHeader(ack
-                .getReply(i));
+                .getHeaderFlag(i));
             // Restart will not be treated differently unless it is
             // the local node or the only one in the pipeline.
             if (PipelineAck.isRestartOOBStatus(reply) &&

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index 35e5bb8..9bd4115 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -130,13 +130,16 @@ public class PipelineAck {
    */
   public PipelineAck(long seqno, int[] replies,
                      long downstreamAckTimeNanos) {
-    ArrayList<Integer> replyList = Lists.newArrayList();
+    ArrayList<Status> statusList = Lists.newArrayList();
+    ArrayList<Integer> flagList = Lists.newArrayList();
     for (int r : replies) {
-      replyList.add(r);
+      statusList.add(StatusFormat.getStatus(r));
+      flagList.add(r);
     }
     proto = PipelineAckProto.newBuilder()
       .setSeqno(seqno)
-      .addAllReply(replyList)
+      .addAllReply(statusList)
+      .addAllFlag(flagList)
       .setDownstreamAckTimeNanos(downstreamAckTimeNanos)
       .build();
   }
@@ -158,11 +161,18 @@ public class PipelineAck {
   }
   
   /**
-   * get the ith reply
-   * @return the the ith reply
+   * get the header flag of ith reply
    */
-  public int getReply(int i) {
-    return proto.getReply(i);
+  public int getHeaderFlag(int i) {
+    if (proto.getFlagCount() > 0) {
+      return proto.getFlag(i);
+    } else {
+      return combineHeader(ECN.DISABLED, proto.getReply(i));
+    }
+  }
+
+  public int getFlag(int i) {
+    return proto.getFlag(i);
   }
 
   /**
@@ -178,8 +188,8 @@ public class PipelineAck {
    * @return true if all statuses are SUCCESS
    */
   public boolean isSuccess() {
-    for (int reply : proto.getReplyList()) {
-      if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
+    for (Status s : proto.getReplyList()) {
+      if (s != Status.SUCCESS) {
         return false;
       }
     }
@@ -196,10 +206,9 @@ public class PipelineAck {
     if (getSeqno() != UNKOWN_SEQNO) {
       return null;
     }
-    for (int reply : proto.getReplyList()) {
+    for (Status s : proto.getReplyList()) {
       // The following check is valid because protobuf guarantees to
       // preserve the ordering of enum elements.
-      Status s = StatusFormat.getStatus(reply);
       if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
         return s;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 0a2b650..4e8ce94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1372,7 +1372,7 @@ class BlockReceiver implements Closeable {
         replies = new int[ackLen + 1];
         replies[0] = myHeader;
         for (int i = 0; i < ackLen; ++i) {
-          replies[i + 1] = ack.getReply(i);
+          replies[i + 1] = ack.getHeaderFlag(i);
         }
         // If the mirror has reported that it received a corrupt packet,
         // do self-destruct to mark myself bad, instead of making the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 8426198..5071d15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -243,8 +243,9 @@ enum ShortCircuitFdResponse {
 
 message PipelineAckProto {
   required sint64 seqno = 1;
-  repeated uint32 reply = 2;
+  repeated Status reply = 2;
   optional uint64 downstreamAckTimeNanos = 3 [default = 0];
+  repeated uint32 flag = 4 [packed=true];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index a6716b1..16889d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -33,6 +33,7 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
@@ -524,6 +526,35 @@ public class TestDataTransferProtocol {
     assertFalse(hdr.sanityCheck(100));
   }
 
+  @Test
+  public void TestPipeLineAckCompatibility() throws IOException {
+    DataTransferProtos.PipelineAckProto proto = DataTransferProtos
+        .PipelineAckProto.newBuilder()
+        .setSeqno(0)
+        .addReply(Status.CHECKSUM_OK)
+        .build();
+
+    DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
+        .PipelineAckProto.newBuilder().mergeFrom(proto)
+        .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
+                                           Status.CHECKSUM_OK))
+        .build();
+
+    ByteOutputStream oldAckBytes = new ByteOutputStream();
+    proto.writeDelimitedTo(oldAckBytes);
+    PipelineAck oldAck = new PipelineAck();
+    oldAck.readFields(new ByteArrayInputStream(oldAckBytes.getBytes()));
+    assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status
+        .CHECKSUM_OK), oldAck.getHeaderFlag(0));
+
+    PipelineAck newAck = new PipelineAck();
+    ByteOutputStream newAckBytes = new ByteOutputStream();
+    newProto.writeDelimitedTo(newAckBytes);
+    newAck.readFields(new ByteArrayInputStream(newAckBytes.getBytes()));
+    assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
+        .CHECKSUM_OK), newAck.getHeaderFlag(0));
+  }
+
   void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
     writeBlock(new ExtendedBlock(poolId, blockId),
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);