You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:06:14 UTC
svn commit: r1077351 - in
/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs:
DFSClient.java server/datanode/BlockReceiver.java
Author: omalley
Date: Fri Mar 4 04:06:14 2011
New Revision: 1077351
URL: http://svn.apache.org/viewvc?rev=1077351&view=rev
Log:
commit fb88642e8acee8dacb5c8146b3b70344f6021a84
Author: Hairong Kuang <ha...@ucdev21.inktomisearch.com>
Date: Sun Mar 21 01:55:11 2010 +0000
HDFS:101 from https://issues.apache.org/jira/secure/attachment/12439387/pipelineHeartbeat_yahoo.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077351&r1=1077350&r2=1077351&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar 4 04:06:14 2011
@@ -2553,7 +2553,7 @@ public class DFSClient implements FSCons
// read an ack from the pipeline
ack.readFields(blockReplyStream);
if (LOG.isDebugEnabled()) {
- LOG.debug("DFSClient " + ack);
+ LOG.debug("DFSClient for block " + block + " " + ack);
}
long seqno = ack.getSeqno();
if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
@@ -2567,7 +2567,7 @@ public class DFSClient implements FSCons
}
if (one.seqno != seqno) {
throw new IOException("Responseprocessor: Expecting seqno " +
- " for block " + block +
+ " for block " + block + " " +
one.seqno + " but received " + seqno);
}
lastPacketInBlock = one.lastPacketInBlock;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1077351&r1=1077350&r2=1077351&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Mar 4 04:06:14 2011
@@ -873,18 +873,16 @@ class BlockReceiver implements java.io.C
PipelineAck ack = new PipelineAck();
long seqno = -2;
try {
- if (!mirrorError) {
+ if (!mirrorError) {
// read an ack from downstream datanode
- ack.readFields(mirrorIn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
- }
- seqno = ack.getSeqno();
- }
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- ack.write(replyOut); // send keepalive
- replyOut.flush();
- } else if (seqno >= 0 || mirrorError) {
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets +
+ " for block " + block + " got " + ack);
+ }
+ seqno = ack.getSeqno();
+ }
+ if (seqno >= 0 || mirrorError) {
Packet pkt = null;
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -963,20 +961,25 @@ class BlockReceiver implements java.io.C
}
}
- // construct my ack message
- short[] replies = null;
- if (mirrorError) { // no ack is read
- replies = new short[2];
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
- replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+ PipelineAck replyAck;
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+ replyAck = ack; // continue to send keep alive
} else {
- replies = new short[1+ack.getNumOfReplies()];
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
- for (int i=0; i<ack.getNumOfReplies(); i++) {
- replies[i+1] = ack.getReply(i);
+ // construct my ack message
+ short[] replies = null;
+ if (mirrorError) { // no ack is read
+ replies = new short[2];
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+ replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+ } else {
+ replies = new short[1+ack.getNumOfReplies()];
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+ for (int i=0; i<ack.getNumOfReplies(); i++) {
+ replies[i+1] = ack.getReply(i);
+ }
}
+ replyAck = new PipelineAck(expected, replies);
}
- PipelineAck replyAck = new PipelineAck(expected, replies);
// send my ack back to upstream datanode
replyAck.write(replyOut);