You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:06:14 UTC

svn commit: r1077351 - in /hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs: DFSClient.java server/datanode/BlockReceiver.java

Author: omalley
Date: Fri Mar  4 04:06:14 2011
New Revision: 1077351

URL: http://svn.apache.org/viewvc?rev=1077351&view=rev
Log:
commit fb88642e8acee8dacb5c8146b3b70344f6021a84
Author: Hairong Kuang <ha...@ucdev21.inktomisearch.com>
Date:   Sun Mar 21 01:55:11 2010 +0000

    HDFS:101 from https://issues.apache.org/jira/secure/attachment/12439387/pipelineHeartbeat_yahoo.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1077351&r1=1077350&r2=1077351&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Mar  4 04:06:14 2011
@@ -2553,7 +2553,7 @@ public class DFSClient implements FSCons
             // read an ack from the pipeline
             ack.readFields(blockReplyStream);
             if (LOG.isDebugEnabled()) {
-              LOG.debug("DFSClient " + ack);
+              LOG.debug("DFSClient for block " + block + " " + ack);
             }
             long seqno = ack.getSeqno();
             if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
@@ -2567,7 +2567,7 @@ public class DFSClient implements FSCons
               }
               if (one.seqno != seqno) {
                 throw new IOException("Responseprocessor: Expecting seqno " + 
-                                      " for block " + block +
+                                      " for block " + block + " " +
                                       one.seqno + " but received " + seqno);
               }
               lastPacketInBlock = one.lastPacketInBlock;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1077351&r1=1077350&r2=1077351&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Mar  4 04:06:14 2011
@@ -873,18 +873,16 @@ class BlockReceiver implements java.io.C
             PipelineAck ack = new PipelineAck();
             long seqno = -2;
             try { 
-               if (!mirrorError) {
+              if (!mirrorError) {
                 // read an ack from downstream datanode
-                 ack.readFields(mirrorIn);
-                 if (LOG.isDebugEnabled()) {
-                   LOG.debug("PacketResponder " + numTargets + " got " + ack);
-                 }
-                 seqno = ack.getSeqno();
-               }
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                ack.write(replyOut); // send keepalive
-                replyOut.flush();
-              } else if (seqno >= 0 || mirrorError) {
+                ack.readFields(mirrorIn);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + 
+                      " for block " + block + " got " + ack);
+                }
+                seqno = ack.getSeqno();
+              }
+              if (seqno >= 0 || mirrorError) {
                 Packet pkt = null;
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
@@ -963,20 +961,25 @@ class BlockReceiver implements java.io.C
               }
             }
 
-            // construct my ack message
-            short[] replies = null;
-            if (mirrorError) { // no ack is read
-              replies = new short[2];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+            PipelineAck replyAck;
+            if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
+              replyAck = ack;  // continue to send keep alive
             } else {
-              replies = new short[1+ack.getNumOfReplies()];
-              replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
-              for (int i=0; i<ack.getNumOfReplies(); i++) {
-                replies[i+1] = ack.getReply(i);
+              // construct my ack message
+              short[] replies = null;
+              if (mirrorError) { // no ack is read
+                replies = new short[2];
+                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+                replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
+              } else {
+                replies = new short[1+ack.getNumOfReplies()];
+                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
+                for (int i=0; i<ack.getNumOfReplies(); i++) {
+                  replies[i+1] = ack.getReply(i);
+                }
               }
+              replyAck = new PipelineAck(expected, replies);
             }
-            PipelineAck replyAck = new PipelineAck(expected, replies);
  
             // send my ack back to upstream datanode
             replyAck.write(replyOut);