You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/03/30 20:59:30 UTC
hadoop git commit: HDFS-7748. Separate ECN flags from the Status in
the DataTransferPipelineAck. Contributed by Anu Engineer and Haohui Mai.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 24d879026 -> dd5b2dac5
HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck. Contributed by Anu Engineer and Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd5b2dac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd5b2dac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd5b2dac
Branch: refs/heads/branch-2
Commit: dd5b2dac5a81952f579906ddd1c95a2e915b513e
Parents: 24d8790
Author: Haohui Mai <wh...@apache.org>
Authored: Mon Mar 30 11:59:21 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Mon Mar 30 11:59:32 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++
.../org/apache/hadoop/hdfs/DataStreamer.java | 2 +-
.../hdfs/protocol/datatransfer/PipelineAck.java | 31 +++++++++++++-------
.../hdfs/server/datanode/BlockReceiver.java | 2 +-
.../src/main/proto/datatransfer.proto | 3 +-
.../hadoop/hdfs/TestDataTransferProtocol.java | 31 ++++++++++++++++++++
6 files changed, 58 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b3cc6b7..667aa05 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1009,6 +1009,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7963. Fix expected tracing spans in TestTracing along with HDFS-7054.
(Masatake Iwasaki via kihwal)
+ HDFS-7748. Separate ECN flags from the Status in the DataTransferPipelineAck.
+ (Anu Engineer and Haohui Mai via wheat9)
+
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 6047825..9c437ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -817,7 +817,7 @@ class DataStreamer extends Daemon {
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = PipelineAck.getStatusFromHeader(ack
- .getReply(i));
+ .getHeaderFlag(i));
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
index 35e5bb8..9bd4115 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -130,13 +130,16 @@ public class PipelineAck {
*/
public PipelineAck(long seqno, int[] replies,
long downstreamAckTimeNanos) {
- ArrayList<Integer> replyList = Lists.newArrayList();
+ ArrayList<Status> statusList = Lists.newArrayList();
+ ArrayList<Integer> flagList = Lists.newArrayList();
for (int r : replies) {
- replyList.add(r);
+ statusList.add(StatusFormat.getStatus(r));
+ flagList.add(r);
}
proto = PipelineAckProto.newBuilder()
.setSeqno(seqno)
- .addAllReply(replyList)
+ .addAllReply(statusList)
+ .addAllFlag(flagList)
.setDownstreamAckTimeNanos(downstreamAckTimeNanos)
.build();
}
@@ -158,11 +161,18 @@ public class PipelineAck {
}
/**
- * get the ith reply
- * @return the the ith reply
+ * get the header flag of ith reply
*/
- public int getReply(int i) {
- return proto.getReply(i);
+ public int getHeaderFlag(int i) {
+ if (proto.getFlagCount() > 0) {
+ return proto.getFlag(i);
+ } else {
+ return combineHeader(ECN.DISABLED, proto.getReply(i));
+ }
+ }
+
+ public int getFlag(int i) {
+ return proto.getFlag(i);
}
/**
@@ -178,8 +188,8 @@ public class PipelineAck {
* @return true if all statuses are SUCCESS
*/
public boolean isSuccess() {
- for (int reply : proto.getReplyList()) {
- if (StatusFormat.getStatus(reply) != Status.SUCCESS) {
+ for (Status s : proto.getReplyList()) {
+ if (s != Status.SUCCESS) {
return false;
}
}
@@ -196,10 +206,9 @@ public class PipelineAck {
if (getSeqno() != UNKOWN_SEQNO) {
return null;
}
- for (int reply : proto.getReplyList()) {
+ for (Status s : proto.getReplyList()) {
// The following check is valid because protobuf guarantees to
// preserve the ordering of enum elements.
- Status s = StatusFormat.getStatus(reply);
if (s.getNumber() >= OOB_START && s.getNumber() <= OOB_END) {
return s;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 0a2b650..4e8ce94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -1372,7 +1372,7 @@ class BlockReceiver implements Closeable {
replies = new int[ackLen + 1];
replies[0] = myHeader;
for (int i = 0; i < ackLen; ++i) {
- replies[i + 1] = ack.getReply(i);
+ replies[i + 1] = ack.getHeaderFlag(i);
}
// If the mirror has reported that it received a corrupt packet,
// do self-destruct to mark myself bad, instead of making the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 8426198..5071d15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -243,8 +243,9 @@ enum ShortCircuitFdResponse {
message PipelineAckProto {
required sint64 seqno = 1;
- repeated uint32 reply = 2;
+ repeated Status reply = 2;
optional uint64 downstreamAckTimeNanos = 3 [default = 0];
+ repeated uint32 flag = 4 [packed=true];
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd5b2dac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index a6716b1..16889d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -33,6 +33,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Random;
+import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
@@ -524,6 +526,35 @@ public class TestDataTransferProtocol {
assertFalse(hdr.sanityCheck(100));
}
+ @Test
+ public void TestPipeLineAckCompatibility() throws IOException {
+ DataTransferProtos.PipelineAckProto proto = DataTransferProtos
+ .PipelineAckProto.newBuilder()
+ .setSeqno(0)
+ .addReply(Status.CHECKSUM_OK)
+ .build();
+
+ DataTransferProtos.PipelineAckProto newProto = DataTransferProtos
+ .PipelineAckProto.newBuilder().mergeFrom(proto)
+ .addFlag(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED,
+ Status.CHECKSUM_OK))
+ .build();
+
+ ByteOutputStream oldAckBytes = new ByteOutputStream();
+ proto.writeDelimitedTo(oldAckBytes);
+ PipelineAck oldAck = new PipelineAck();
+ oldAck.readFields(new ByteArrayInputStream(oldAckBytes.getBytes()));
+ assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.DISABLED, Status
+ .CHECKSUM_OK), oldAck.getHeaderFlag(0));
+
+ PipelineAck newAck = new PipelineAck();
+ ByteOutputStream newAckBytes = new ByteOutputStream();
+ newProto.writeDelimitedTo(newAckBytes);
+ newAck.readFields(new ByteArrayInputStream(newAckBytes.getBytes()));
+ assertEquals(PipelineAck.combineHeader(PipelineAck.ECN.SUPPORTED, Status
+ .CHECKSUM_OK), newAck.getHeaderFlag(0));
+ }
+
void writeBlock(String poolId, long blockId, DataChecksum checksum) throws IOException {
writeBlock(new ExtendedBlock(poolId, blockId),
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, checksum);