You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/15 07:08:39 UTC

svn commit: r890655 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/ha...

Author: hairong
Date: Tue Dec 15 06:08:39 2009
New Revision: 890655

URL: http://svn.apache.org/viewvc?rev=890655&view=rev
Log:
HDFS-724. Pipeline hangs when one of the block receiver is not responsive. Contributed by Hairong Kuang.

Added:
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 15 06:08:39 2009
@@ -603,6 +603,9 @@
     HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
     name-node is in safemode. (Ravi Phulari via shv)
 
+    HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+    (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Dec 15 06:08:39 2009
@@ -2475,7 +2475,27 @@
       int     dataPos;
       int     checksumStart;
       int     checksumPos;      
-  
+      private static final long HEART_BEAT_SEQNO = -1L;
+
+      /**
+       *  create a heartbeat packet
+       */
+      Packet() {
+        this.lastPacketInBlock = false;
+        this.numChunks = 0;
+        this.offsetInBlock = 0;
+        this.seqno = HEART_BEAT_SEQNO;
+        
+        buffer = null;
+        int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+        buf = new byte[packetSize];
+        
+        checksumStart = dataStart = packetSize;
+        checksumPos = checksumStart;
+        dataPos = dataStart;
+        maxChunks = 0;
+      }
+      
       // create a new packet
       Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
         this.lastPacketInBlock = false;
@@ -2562,6 +2582,14 @@
         return offsetInBlock + dataPos - dataStart;
       }
       
+      /**
+       * Check if this packet is a heart beat packet
+       * @return true if the sequence number is HEART_BEAT_SEQNO
+       */
+      private boolean isHeartbeatPacket() {
+        return seqno == HEART_BEAT_SEQNO;
+      }
+      
       public String toString() {
         return "packet seqno:" + this.seqno +
         " offsetInBlock:" + this.offsetInBlock + 
@@ -2680,6 +2708,7 @@
        * and closes them. Any error recovery is also done by this thread.
        */
       public void run() {
+        long lastPacket = System.currentTimeMillis();
         while (!streamerClosed && clientRunning) {
 
           // if the Responder encountered an error, shutdown Responder
@@ -2703,19 +2732,32 @@
 
             synchronized (dataQueue) {
               // wait for a packet to be sent.
+              long now = System.currentTimeMillis();
               while ((!streamerClosed && !hasError && clientRunning 
-                  && dataQueue.size() == 0) || doSleep) {
+                  && dataQueue.size() == 0 && 
+                  (stage != BlockConstructionStage.DATA_STREAMING || 
+                   stage == BlockConstructionStage.DATA_STREAMING && 
+                   now - lastPacket < socketTimeout/2)) || doSleep ) {
+                long timeout = socketTimeout/2 - (now-lastPacket);
+                timeout = timeout <= 0 ? 1000 : timeout;
+                timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+                   timeout : 1000;
                 try {
-                  dataQueue.wait(1000);
+                  dataQueue.wait(timeout);
                 } catch (InterruptedException  e) {
                 }
                 doSleep = false;
+                now = System.currentTimeMillis();
               }
-              if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+              if (streamerClosed || hasError || !clientRunning) {
                 continue;
               }
               // get packet to be sent.
-              one = dataQueue.getFirst();
+              if (dataQueue.isEmpty()) {
+                one = new Packet();  // heartbeat packet
+              } else {
+                one = dataQueue.getFirst(); // regular data packet
+              }
             }
 
             // get new block from namenode.
@@ -2761,9 +2803,11 @@
 
             synchronized (dataQueue) {
               // move packet from dataQueue to ackQueue
-              dataQueue.removeFirst();
-              ackQueue.addLast(one);
-              dataQueue.notifyAll();
+              if (!one.isHeartbeatPacket()) {
+                dataQueue.removeFirst();
+                ackQueue.addLast(one);
+                dataQueue.notifyAll();
+              }
             }
 
             if (LOG.isDebugEnabled()) {
@@ -2774,6 +2818,10 @@
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
             blockStream.flush();
+            lastPacket = System.currentTimeMillis();
+            
+            if (one.isHeartbeatPacket()) {  //heartbeat packet
+            }
             
             // update bytesSent
             long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2910,24 +2958,7 @@
               }
               
               long seqno = ack.getSeqno();
-              Packet one = null;
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                continue;
-              } else if (seqno == -2) {
-                // no nothing
-              } else {
-                synchronized (dataQueue) {
-                  one = ackQueue.getFirst();
-                }
-                if (one.seqno != seqno) {
-                  throw new IOException("Responseprocessor: Expecting seqno " + 
-                      " for block " + block +
-                      one.seqno + " but received " + seqno);
-                }
-                isLastPacketInBlock = one.lastPacketInBlock;
-              }
-
-              // processes response status from all datanodes.
+              // processes response status from datanodes.
               for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--) {
                 final DataTransferProtocol.Status reply = ack.getReply(i);
                 if (reply != SUCCESS) {
@@ -2938,12 +2969,24 @@
                       targets[i].getName());
                 }
               }
+              
+              assert seqno != PipelineAck.UNKOWN_SEQNO : 
+                "Ack for unkown seqno should be a failed ack: " + ack;
+              if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
+                continue;
+              }
 
-              if (one == null) {
-                throw new IOException("Panic: responder did not receive " +
-                    "an ack for a packet: " + seqno);
+              // a success ack for a data packet
+              Packet one = null;
+              synchronized (dataQueue) {
+                one = ackQueue.getFirst();
               }
-              
+              if (one.seqno != seqno) {
+                throw new IOException("Responseprocessor: Expecting seqno " +
+                                      " for block " + block +
+                                      one.seqno + " but received " + seqno);
+              }
+              isLastPacketInBlock = one.lastPacketInBlock;
               // update bytesAcked
               block.setNumBytes(one.getLastByteOffsetBlock());
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Dec 15 06:08:39 2009
@@ -458,7 +458,7 @@
   public static class PipelineAck implements Writable {
     private long seqno;
     private Status replies[];
-    final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new Status[0]);
+    public final static long UNKOWN_SEQNO = -2;
 
     /** default constructor **/
     public PipelineAck() {
@@ -495,6 +495,10 @@
      * @return the the ith reply
      */
     public Status getReply(int i) {
+      if (i<0 || i>=replies.length) {
+        throw new IllegalArgumentException("The input parameter " + i + 
+            " should in the range of [0, " + replies.length);
+      }
       return replies[i];
     }
     

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 15 06:08:39 2009
@@ -560,7 +560,7 @@
       throttler.throttle(len);
     }
     
-    return len;
+    return lastPacketInBlock?-1:len;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -589,9 +589,9 @@
       }
 
       /* 
-       * Receive until packet has zero bytes of data.
+       * Receive until the last packet.
        */
-      while (receivePacket() > 0) {}
+      while (receivePacket() >= 0) {}
 
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
@@ -775,118 +775,11 @@
       notifyAll();
     }
 
-    private synchronized void lastDataNodeRun() {
-      long lastHeartbeat = System.currentTimeMillis();
-      boolean lastPacket = false;
-      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
-      while (running && datanode.shouldRun && !lastPacket) {
-        long now = System.currentTimeMillis();
-        try {
-
-            // wait for a packet to be sent to downstream datanode
-            while (running && datanode.shouldRun && ackQueue.size() == 0) {
-              long idle = now - lastHeartbeat;
-              long timeout = (datanode.socketTimeout/2) - idle;
-              if (timeout <= 0) {
-                timeout = 1000;
-              }
-              try {
-                wait(timeout);
-              } catch (InterruptedException e) {
-                if (running) {
-                  LOG.info("PacketResponder " + numTargets +
-                           " for block " + block + " Interrupted.");
-                  running = false;
-                }
-                break;
-              }
-          
-              // send a heartbeat if it is time.
-              now = System.currentTimeMillis();
-              if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                PipelineAck.HEART_BEAT.write(replyOut);  // send heart beat
-                replyOut.flush();
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets +
-                            " for block " + block + 
-                            " sent a heartbeat");
-                }
-                lastHeartbeat = now;
-              }
-            }
-
-            if (!running || !datanode.shouldRun) {
-              break;
-            }
-            Packet pkt = ackQueue.getFirst();
-            long expected = pkt.seqno;
-            LOG.debug("PacketResponder " + numTargets +
-                      " for block " + block + 
-                      " acking for packet " + expected);
-
-            // If this is the last packet in block, then close block
-            // file and finalize the block before responding success
-            if (pkt.lastPacketInBlock) {
-              receiver.close();
-              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(replicaInfo.getNumBytes());
-              datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
-              if (ClientTraceLog.isInfoEnabled() &&
-                  receiver.clientName.length() > 0) {
-                long offset = 0;
-                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                    receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                    "HDFS_WRITE", receiver.clientName, offset,
-                    datanode.dnRegistration.getStorageID(), block, endTime-startTime));
-              } else {
-                LOG.info("Received block " + block + 
-                    " of size " + block.getNumBytes() + 
-                    " from " + receiver.inAddr);
-              }
-              lastPacket = true;
-            }
-
-            new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
-            replyOut.flush();
-            // remove the packet from the ack queue
-            removeAckHead();
-            // update the bytes acked
-            if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
-            }
-        } catch (Exception e) {
-          LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
-          if (running) {
-            try {
-              datanode.checkDiskError(e); // may throw an exception here
-            } catch (IOException ioe) {
-              LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
-                  ioe);
-            }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
-            running = false;
-          }
-        }
-      }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
-    }
-
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
      */
     public void run() {
-
-      // If this is the last datanode in pipeline, then handle differently
-      if (numTargets == 0) {
-        lastDataNodeRun();
-        return;
-      }
-
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
@@ -897,19 +790,18 @@
             Packet pkt = null;
             long expected = -2;
             PipelineAck ack = new PipelineAck();
-            try { 
-              // read an ack from downstream datanode
-              ack.readFields(mirrorIn);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("PacketResponder " + numTargets + " got " + ack);
+            long seqno = PipelineAck.UNKOWN_SEQNO;
+            try {
+              if (numTargets != 0) {// not the last DN
+                // read an ack from downstream datanode
+                ack.readFields(mirrorIn);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                }
+                seqno = ack.getSeqno();
+                didRead = true;
               }
-              long seqno = ack.getSeqno();
-              didRead = true;
-              if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
-                ack.write(replyOut);
-                replyOut.flush();
-                continue;
-              } else if (seqno >= 0) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
@@ -925,9 +817,12 @@
                       throw e;
                     }
                   }
+                  if (!running || !datanode.shouldRun) {
+                    break;
+                  }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (seqno != expected) {
+                  if (numTargets > 0 && seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
                                           " expected seqno:" + expected +
@@ -983,14 +878,15 @@
 
             // construct my ack message
             Status[] replies = null;
-            if (!didRead) { // no ack is read
+            if (!didRead && numTargets != 0) { // ack read error
               replies = new Status[2];
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              replies = new Status[1+ack.getNumOfReplies()];
+              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
-              for (int i=0; i<ack.getNumOfReplies(); i++) {
+              for (int i=0; i<ackLen; i++) {
                 replies[i+1] = ack.getReply(i);
               }
             }

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 15 06:08:39 2009
@@ -99,12 +99,6 @@
     && args(acked) 
     && this(pr);
 
-  pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) : 
-    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
-    && withincode (void PacketResponder.lastDataNodeRun())
-    && args(acked) 
-    && this(pr);
-  
   after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
     PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
     if (pTest == null) {
@@ -117,19 +111,7 @@
       bytesAckedService((PipelinesTest)pTest, pr, acked);
     }
   }
-  after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
-    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
-    if (pTest == null) {
-      LOG.debug("FI: no pipeline has been found in acking");
-      return;
-    }
-    LOG.debug("FI: Acked total bytes from (last DN): " + 
-        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
-    if (pTest instanceof PipelinesTest) {
-      bytesAckedService((PipelinesTest)pTest, pr, acked); 
-    }
-  }
-  
+
   private void bytesAckedService 
       (final PipelinesTest pTest, final PacketResponder pr, final long acked) {
     NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java?rev=890655&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Tue Dec 15 06:08:39 2009
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiPipelineClose {
+  static final short REPLICATION = 3;
+  static final long BLOCKSIZE = 1L * (1L << 20);
+
+  static final Configuration conf = new HdfsConfiguration();
+  static {
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+  }
+
+  static private FSDataOutputStream createFile(FileSystem fs, Path p
+      ) throws IOException {
+    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+        REPLICATION, BLOCKSIZE);
+  }
+
+  /**
+   * 1. create files with dfs
+   * 2. write 1 byte
+   * 3. close file
+   * 4. open the same file
+   * 5. read the 1 byte and compare results
+   */
+  private static void write1byte(String methodName) throws IOException {
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
+        null);
+    final FileSystem dfs = cluster.getFileSystem();
+    try {
+      final Path p = new Path("/" + methodName + "/foo");
+      final FSDataOutputStream out = createFile(dfs, p);
+      out.write(1);
+      out.close();
+      
+      final FSDataInputStream in = dfs.open(p);
+      final int b = in.read();
+      in.close();
+      Assert.assertEquals(1, b);
+    }
+    finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+
+   private static void runPipelineCloseTest(String methodName,
+      Action<DatanodeID> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiPipelineClose.set(a);
+    write1byte(methodName);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 never responses after received close request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_36() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new SleepAction(methodName, 0, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 never responses after received close request from client.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_37() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new SleepAction(methodName, 1, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 never responses after received close request from client.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_38() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Tue Dec 15 06:08:39 2009
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import org.junit.Test;
@@ -30,6 +34,11 @@
 /** Class contains a set of tests to verify the correctness of 
  * newly introduced {@link FSDataOutputStream#hflush()} method */
 public class TestHFlush {
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
   private final String fName = "hflushtest.dat";
   
   /** The test uses {@link #doTheJob(Configuration, String, long, short)
@@ -143,4 +152,55 @@
       actual[idx] = 0;
     }
   }
+  
+  /** This creates a slow writer and check to see 
+   * if pipeline heartbeats work fine
+   */
+ @Test
+  public void testPipelineHeartbeat() throws Exception {
+    final int DATANODE_NUM = 2;
+    final int fileLen = 6;
+    Configuration conf = new HdfsConfiguration();
+    final int timeout = 2000;
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
+        timeout);
+
+    final Path p = new Path("/pipelineHeartbeat/foo");
+    System.out.println("p=" + p);
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+    byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+    // create a new file.
+    FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+    stm.write(fileContents, 0, 1);
+    Thread.sleep(timeout);
+    stm.hflush();
+    System.out.println("Wrote 1 byte and hflush " + p);
+
+    // write another byte
+    Thread.sleep(timeout);
+    stm.write(fileContents, 1, 1);
+    stm.hflush();
+    
+    stm.write(fileContents, 2, 1);
+    Thread.sleep(timeout);
+    stm.hflush();
+    
+    stm.write(fileContents, 3, 1);
+    Thread.sleep(timeout);
+    stm.write(fileContents, 4, 1);
+    stm.hflush();
+    
+    stm.write(fileContents, 5, 1);
+    Thread.sleep(timeout);
+    stm.close();
+
+    // verify that entire file is good
+    AppendTestUtil.checkFullFile(fs, p, fileLen,
+        fileContents, "Failed to slowly write to a file");
+  }
 }