You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/22 05:38:23 UTC

svn commit: r893066 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/aop/org/apache/hadoop/fi/ src/test/aop/org/apache/hadoop/hdfs/ src/test/aop/org/apache/hadoop/hdfs/server/datanode/

Author: hairong
Date: Tue Dec 22 04:38:16 2009
New Revision: 893066

URL: http://svn.apache.org/viewvc?rev=893066&view=rev
Log:
HDFS-564. Adding pipeline tests 17-35. Contributed by Nicholas, Kan, and Hairong.

Added:
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 22 04:38:16 2009
@@ -576,6 +576,8 @@
     HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
     (hairong)
 
+    HDFS-564. Adding pipeline tests 17-35. (hairong)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 22 04:38:16 2009
@@ -432,6 +432,14 @@
     return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
   }
 
+  /**
+   * Write the received packet to disk (data only)
+   */
+  private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, 
+      int numBytesToDisk) throws IOException {
+    out.write(pktBuf, startByteToDisk, numBytesToDisk);
+  }
+  
   /** 
    * Receives and processes a packet. It can contain many chunks.
    * returns the number of data bytes that the packet has.
@@ -524,7 +532,7 @@
 
           int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
-          out.write(pktBuf, startByteToDisk, numBytesToDisk);
+          writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Tue Dec 22 04:38:16 2009
@@ -59,30 +59,36 @@
     private volatile boolean isSuccess = false;
 
     /** Simulate action for the receiverOpWriteBlock pointcut */
-    public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiReceiverOpWriteBlock
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the callReceivePacket pointcut */
-    public final ActionContainer<DatanodeID> fiCallReceivePacket
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiCallReceivePacket
+        = new ActionContainer<DatanodeID, IOException>();
+    /** Simulate action for the callWritePacketToDisk pointcut */
+    public final ActionContainer<DatanodeID, IOException> fiCallWritePacketToDisk
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the statusRead pointcut */
-    public final ActionContainer<DatanodeID> fiStatusRead
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiStatusRead
+        = new ActionContainer<DatanodeID, IOException>();
+    /** Simulate action for the afterDownstreamStatusRead pointcut */
+    public final ActionContainer<DatanodeID, IOException> fiAfterDownstreamStatusRead
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the pipelineAck pointcut */
-    public final ActionContainer<DatanodeID> fiPipelineAck
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiPipelineAck
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the pipelineClose pointcut */
-    public final ActionContainer<DatanodeID> fiPipelineClose
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiPipelineClose
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the blockFileClose pointcut */
-    public final ActionContainer<DatanodeID> fiBlockFileClose
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiBlockFileClose
+        = new ActionContainer<DatanodeID, IOException>();
 
     /** Verification action for the pipelineInitNonAppend pointcut */
-    public final ActionContainer<Integer> fiPipelineInitErrorNonAppend
-        = new ActionContainer<Integer>();
+    public final ActionContainer<Integer, RuntimeException> fiPipelineInitErrorNonAppend
+        = new ActionContainer<Integer, RuntimeException>();
     /** Verification action for the pipelineErrorAfterInit pointcut */
-    public final ActionContainer<Integer> fiPipelineErrorAfterInit
-        = new ActionContainer<Integer>();
+    public final ActionContainer<Integer, RuntimeException> fiPipelineErrorAfterInit
+        = new ActionContainer<Integer, RuntimeException>();
 
     /** Get test status */
     public boolean isSuccess() {
@@ -121,7 +127,8 @@
   }
 
   /** Action for DataNode */
-  public static abstract class DataNodeAction implements Action<DatanodeID> {
+  public static abstract class DataNodeAction implements
+      Action<DatanodeID, IOException> {
     /** The name of the test */
     final String currentTest;
     /** The index of the datanode */
@@ -195,6 +202,28 @@
     }
   }
 
+  /** Throws OutOfMemoryError if the count is zero. */
+  public static class CountdownOomAction extends OomAction {
+    private final CountdownConstraint countdown;
+
+    /** Create an action for datanode i in the pipeline with count down. */
+    public CountdownOomAction(String currentTest, int i, int count) {
+      super(currentTest, i);
+      countdown = new CountdownConstraint(count);
+    }
+
+    @Override
+    public void run(DatanodeID id) {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id) && countdown.isSatisfied()) {
+        final String s = toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new OutOfMemoryError(s);
+      }
+    }
+  }
+
   /** Throws DiskOutOfSpaceException. */
   public static class DoosAction extends DataNodeAction {
     /** Create an action for datanode i in the pipeline. */
@@ -242,6 +271,28 @@
     }
   }
 
+  /** Throws DiskOutOfSpaceException if the count is zero. */
+  public static class CountdownDoosAction extends DoosAction {
+    private final CountdownConstraint countdown;
+
+    /** Create an action for datanode i in the pipeline with count down. */
+    public CountdownDoosAction(String currentTest, int i, int count) {
+      super(currentTest, i);
+      countdown = new CountdownConstraint(count);
+    }
+
+    @Override
+    public void run(DatanodeID id) throws DiskOutOfSpaceException {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id) && countdown.isSatisfied()) {
+        final String s = toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new DiskOutOfSpaceException(s);
+      }
+    }
+  }
+
   /**
    * Sleep some period of time so that it slows down the datanode
    * or sleep forever so that datanode becomes not responding.
@@ -307,8 +358,50 @@
     }
   }
 
+  /**
+   * When the count is zero,
+   * sleep some period of time so that it slows down the datanode
+   * or sleep forever so that datanode becomes not responding.
+   */
+  public static class CountdownSleepAction extends SleepAction {
+    private final CountdownConstraint countdown;
+
+    /**
+     * Create an action for datanode i in the pipeline.
+     * @param duration In milliseconds, duration <= 0 means sleeping forever.
+     */
+    public CountdownSleepAction(String currentTest, int i,
+        long duration, int count) {
+      this(currentTest, i, duration, duration+1, count);
+    }
+
+    /** Create an action for datanode i in the pipeline with count down. */
+    public CountdownSleepAction(String currentTest, int i,
+        long minDuration, long maxDuration, int count) {
+      super(currentTest, i, minDuration, maxDuration);
+      countdown = new CountdownConstraint(count);
+    }
+
+    @Override
+    public void run(DatanodeID id) {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id) && countdown.isSatisfied()) {
+        final String s = toString(id) + ", duration = ["
+        + minDuration + "," + maxDuration + ")";
+        FiTestUtil.LOG.info(s);
+        if (maxDuration <= 1) {
+          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+        } else {
+          FiTestUtil.sleep(minDuration, maxDuration);
+        }
+      }
+    }
+  }
+
   /** Action for pipeline error verification */
-  public static class VerificationAction implements Action<Integer> {
+  public static class VerificationAction implements
+      Action<Integer, RuntimeException> {
     /** The name of the test */
     final String currentTest;
     /** The error index of the datanode */
@@ -343,9 +436,10 @@
    *  Create a OomAction with a CountdownConstraint
    *  so that it throws OutOfMemoryError if the count is zero.
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownOomAction(
-      String currentTest, int i, int count) {
-    return new ConstraintSatisfactionAction<DatanodeID>(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException>
+      createCountdownOomAction(
+        String currentTest, int i, int count) {
+    return new ConstraintSatisfactionAction<DatanodeID, IOException>(
         new OomAction(currentTest, i), new CountdownConstraint(count));
   }
 
@@ -353,9 +447,10 @@
    *  Create a DoosAction with a CountdownConstraint
    *  so that it throws DiskOutOfSpaceException if the count is zero.
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownDoosAction(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException>
+    createCountdownDoosAction(
       String currentTest, int i, int count) {
-    return new ConstraintSatisfactionAction<DatanodeID>(
+    return new ConstraintSatisfactionAction<DatanodeID, IOException>(
         new DoosAction(currentTest, i), new CountdownConstraint(count));
   }
 
@@ -366,9 +461,9 @@
    * sleep some period of time so that it slows down the datanode
    * or sleep forever so the that datanode becomes not responding.
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
       String currentTest, int i, long minDuration, long maxDuration, int count) {
-    return new ConstraintSatisfactionAction<DatanodeID>(
+    return new ConstraintSatisfactionAction<DatanodeID, IOException>(
         new SleepAction(currentTest, i, minDuration, maxDuration),
         new CountdownConstraint(count));
   }
@@ -377,7 +472,7 @@
    * Same as
    * createCountdownSleepAction(currentTest, i, duration, duration+1, count).
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
       String currentTest, int i, long duration, int count) {
     return createCountdownSleepAction(currentTest, i, duration, duration+1,
         count);

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java Tue Dec 22 04:38:16 2009
@@ -59,9 +59,9 @@
   
   /** Class adds new type of action */
   public static class HFlushTest extends DataTransferTest {
-    public final ActionContainer<DatanodeID> fiCallHFlush = 
-      new ActionContainer<DatanodeID>();
-    public final ActionContainer<Integer> fiErrorOnCallHFlush = 
-      new ActionContainer<Integer>();
+    public final ActionContainer<DatanodeID, IOException> fiCallHFlush = 
+      new ActionContainer<DatanodeID, IOException>();
+    public final ActionContainer<Integer, RuntimeException> fiErrorOnCallHFlush = 
+      new ActionContainer<Integer, RuntimeException>();
   }
 }
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java Tue Dec 22 04:38:16 2009
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.fi;
 
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -95,24 +96,23 @@
   }
 
   /** Action interface */
-  public static interface Action<T> {
+  public static interface Action<T, E extends Exception> {
     /** Run the action with the parameter. */
-    public void run(T parameter) throws IOException;
+    public void run(T parameter) throws E;
   }
 
   /** An ActionContainer contains at most one action. */
-  public static class ActionContainer<T> {
-    private Action<T> action;
-
+  public static class ActionContainer<T, E extends Exception> {
+    private List<Action<T, E>> actionList = new ArrayList<Action<T, E>>();
     /** Create an empty container. */
     public ActionContainer() {}
 
     /** Set action. */
-    public void set(Action<T> a) {action = a;}
+    public void set(Action<T, E> a) {actionList.add(a);}
 
     /** Run the action if it exists. */
-    public void run(T obj) throws IOException {
-      if (action != null) {
+    public void run(T obj) throws E {
+      for (Action<T, E> action : actionList) {
         action.run(obj);
       }
     }
@@ -147,13 +147,14 @@
   }
   
   /** An action is fired if all the constraints are satisfied. */
-  public static class ConstraintSatisfactionAction<T> implements Action<T> {
-    private final Action<T> action;
+  public static class ConstraintSatisfactionAction<T, E extends Exception> 
+      implements Action<T, E> {
+    private final Action<T, E> action;
     private final Constraint[] constraints;
     
     /** Constructor */
     public ConstraintSatisfactionAction(
-        Action<T> action, Constraint... constraints) {
+        Action<T, E> action, Constraint... constraints) {
       this.action = action;
       this.constraints = constraints;
     }
@@ -163,7 +164,7 @@
      * Short-circuit-and is used. 
      */
     @Override
-    public final void run(T parameter) throws IOException {
+    public final void run(T parameter) throws E {
       for(Constraint c : constraints) {
         if (!c.isSatisfied()) {
           return;

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Tue Dec 22 04:38:16 2009
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
@@ -49,14 +47,10 @@
   after(DataStreamer datastreamer) returning : pipelineInitNonAppend(datastreamer) {
     LOG.info("FI: after pipelineInitNonAppend: hasError="
         + datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
-    try {
-      if (datastreamer.hasError) {
-        DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
-        if (dtTest != null )
-          dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    if (datastreamer.hasError) {
+      DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+      if (dtTest != null)
+        dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
     }
   }
 
@@ -78,13 +72,9 @@
   before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
     LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
         + datastreamer.errorIndex);
-    try {
-      DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
-      if (dtTest != null )
-        dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null )
+      dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
   }
 
   pointcut pipelineClose(DFSOutputStream out):

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java Tue Dec 22 04:38:16 2009
@@ -39,7 +39,7 @@
   /**
    * Storing acknowleged bytes num. action for fault injection tests
    */
-  public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes> {
+  public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
     String name;
     LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
     LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -77,7 +77,7 @@
   /**
    * Storing acknowleged bytes num. action for fault injection tests
    */
-  public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes> {
+  public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
     String name;
     LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
     LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -118,10 +118,10 @@
     LinkedList<NodeBytes> received = new LinkedList<NodeBytes>();
     LinkedList<NodeBytes> acked = new LinkedList<NodeBytes>();
 
-    public final ActionContainer<NodeBytes> fiCallSetNumBytes =
-      new ActionContainer<NodeBytes>();
-    public final ActionContainer<NodeBytes> fiCallSetBytesAcked =
-      new ActionContainer<NodeBytes>();
+    public final ActionContainer<NodeBytes, IOException> fiCallSetNumBytes =
+      new ActionContainer<NodeBytes, IOException>();
+    public final ActionContainer<NodeBytes, IOException> fiCallSetBytesAcked =
+      new ActionContainer<NodeBytes, IOException>();
     
     private static boolean suspend = false;
     private static long lastQueuedPacket = -1;

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 22 04:38:16 2009
@@ -25,6 +25,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.Pipeline;
 import org.apache.hadoop.fi.PipelineTest;
 import org.apache.hadoop.fi.ProbabilityModel;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
@@ -44,12 +45,7 @@
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
-    call (* OutputStream.write(..))
-      && withincode (* BlockReceiver.receivePacket(..))
-// to further limit the application of this aspect a very narrow 'target' can be used as follows
-//  && target(DataOutputStream)
-      && !within(BlockReceiverAspects +)
-      && this(blockreceiver);
+    call(* receivePacket(..)) && target(blockreceiver);
 	
   before(BlockReceiver blockreceiver
       ) throws IOException : callReceivePacket(blockreceiver) {
@@ -67,7 +63,30 @@
     }
   }
   
-  // Pointcuts and advises for TestFiPipelines  
+  pointcut callWritePacketToDisk(BlockReceiver blockreceiver) :
+    call(* writePacketToDisk(..)) && target(blockreceiver);
+
+  before(BlockReceiver blockreceiver
+      ) throws IOException : callWritePacketToDisk(blockreceiver) {
+    LOG.info("FI: callWritePacketToDisk");
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiCallWritePacketToDisk.run(
+          blockreceiver.getDataNode().getDatanodeRegistration());
+  }
+
+  pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
+    call(void PipelineAck.readFields(DataInput)) && this(responder);
+
+  after(BlockReceiver.PacketResponder responder)
+      throws IOException: afterDownstreamStatusRead(responder) {
+    final DataNode d = responder.receiver.getDataNode();
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeRegistration());
+  }
+
+    // Pointcuts and advises for TestFiPipelines  
   pointcut callSetNumBytes(BlockReceiver br, long offset) : 
     call (void ReplicaInPipelineInterface.setNumBytes(long)) 
     && withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Tue Dec 22 04:38:16 2009
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.FiTestUtil;
@@ -101,7 +102,7 @@
   }
   
   private static void runReceiverOpWriteBlockTest(String methodName,
-      int errorIndex, Action<DatanodeID> a) throws IOException {
+      int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
@@ -113,7 +114,7 @@
   }
   
   private static void runStatusReadTest(String methodName, int errorIndex,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
@@ -124,11 +125,11 @@
     Assert.assertTrue(t.isSuccess());
   }
 
-  private static void runCallReceivePacketTest(String methodName,
-      int errorIndex, Action<DatanodeID> a) throws IOException {
+  private static void runCallWritePacketToDisk(String methodName,
+      int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
-    t.fiCallReceivePacket.set(a);
+    t.fiCallWritePacketToDisk.set(a);
     t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex));
     write1byte(methodName);
     Assert.assertTrue(t.isSuccess());
@@ -280,7 +281,7 @@
   @Test
   public void pipeline_Fi_14() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, 0, new DoosAction(methodName, 0));
+    runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0));
   }
 
   /**
@@ -291,7 +292,7 @@
   @Test
   public void pipeline_Fi_15() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, 1, new DoosAction(methodName, 1));
+    runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1));
   }
   
   /**
@@ -302,11 +303,11 @@
   @Test
   public void pipeline_Fi_16() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2));
+    runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
   }
 
   private static void runPipelineCloseTest(String methodName,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
@@ -324,7 +325,7 @@
     final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
     final MarkerConstraint marker = new MarkerConstraint(name);
     t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
-    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID>(a, marker));
+    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
     write1byte(name);
   }
 
@@ -442,7 +443,7 @@
   }
 
   private static void runBlockFileCloseTest(String methodName,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();

Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java?rev=893066&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Tue Dec 22 04:38:16 2009
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.log4j.Level;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiDataTransferProtocol2 {
+  static final short REPLICATION = 3;
+  static final long BLOCKSIZE = 1L * (1L << 20);
+  static final int PACKET_SIZE = 1024;
+  static final int MIN_N_PACKET = 3;
+  static final int MAX_N_PACKET = 10;
+
+  static final Configuration conf = new Configuration();
+  static {
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
+    conf.setInt("dfs.socket.timeout", 5000);
+  }
+
+  static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE];
+  static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE];
+
+  static private FSDataOutputStream createFile(FileSystem fs, Path p
+      ) throws IOException {
+    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+        REPLICATION, BLOCKSIZE);
+  }
+
+  {
+    ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+  /**
+   * 1. create files with dfs
+   * 2. write MIN_N_PACKET to MAX_N_PACKET packets
+   * 3. close file
+   * 4. open the same file
+   * 5. read the bytes and compare results
+   */
+  private static void writeSeveralPackets(String methodName) throws IOException {
+    final Random r = FiTestUtil.RANDOM.get();
+    final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1);
+    final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1);
+    final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize;
+
+    FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
+        + ", lastPacketSize=" + lastPacketSize);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
+    final FileSystem dfs = cluster.getFileSystem();
+    try {
+      final Path p = new Path("/" + methodName + "/foo");
+      final FSDataOutputStream out = createFile(dfs, p);
+
+      final long seed = r.nextLong();
+      final Random ran = new Random(seed);
+      ran.nextBytes(bytes);
+      out.write(bytes, 0, size);
+      out.close();
+
+      final FSDataInputStream in = dfs.open(p);
+      int totalRead = 0;
+      int nRead = 0;
+      while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+      Assert.assertEquals("Cannot read file.", size, totalRead);
+      for (int i = 0; i < size; i++) {
+        Assert.assertTrue("File content differ.", bytes[i] == toRead[i]);
+      }
+    }
+    finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+
+  private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a)
+      throws IOException {
+    t.fiCallReceivePacket.set(a);
+    t.fiReceiverOpWriteBlock.set(a);
+    t.fiStatusRead.set(a);
+  }
+
+  private void runTest17_19(String methodName, int dnIndex)
+      throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final int maxSleep = 3000;
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+    t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3));
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+    writeSeveralPackets(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+
+  private void runTest29_30(String methodName, int dnIndex) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final int maxSleep = 3000;
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+    t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3));
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+    writeSeveralPackets(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+  
+  private void runTest34_35(String methodName, int dnIndex) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3));
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+    writeSeveralPackets(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+  /**
+   * Streaming:
+   * Randomize datanode speed, write several packets,
+   * DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+   * Client gets an IOException and determines DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_17() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest17_19(methodName, 0);
+  }
+  
+  /**
+   * Streaming:
+   * Randomize datanode speed, write several packets,
+   * DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+   * Client gets an IOException and determines DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_18() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest17_19(methodName, 1);
+  }
+  
+  /**
+   * Streaming:
+   * Randomize datanode speed, write several packets,
+   * DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+   * Client gets an IOException and determines DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_19() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest17_19(methodName, 2);
+  }
+  
+  /**
+   * Streaming: Client writes several packets with DN0 very slow. Client
+   * finishes write successfully.
+   */
+  @Test
+  public void pipeline_Fi_20() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 3000));
+    writeSeveralPackets(methodName);
+  }
+
+  /**
+   * Streaming: Client writes several packets with DN1 very slow. Client
+   * finishes write successfully.
+   */
+  @Test
+  public void pipeline_Fi_21() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 3000));
+    writeSeveralPackets(methodName);
+  }
+  
+  /**
+   * Streaming: Client writes several packets with DN2 very slow. Client
+   * finishes write successfully.
+   */
+  @Test
+  public void pipeline_Fi_22() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 3000));
+    writeSeveralPackets(methodName);
+  }
+  
+  /**
+   * Streaming: Randomize datanode speed, write several packets, DN1 throws a
+   * OutOfMemoryException when it receives the ack of the third packet from DN2.
+   * Client gets an IOException and determines DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_29() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest29_30(methodName, 1);
+  }
+
+  /**
+   * Streaming: Randomize datanode speed, write several packets, DN0 throws a
+   * OutOfMemoryException when it receives the ack of the third packet from DN1.
+   * Client gets an IOException and determines DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_30() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest29_30(methodName, 0);
+  }
+  
+  /**
+   * Streaming: Write several packets, DN1 never responses when it receives the
+   * ack of the third packet from DN2. Client gets an IOException and determines
+   * DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_34() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest34_35(methodName, 1);
+  }
+
+  /**
+   * Streaming: Write several packets, DN0 never responses when it receives the
+   * ack of the third packet from DN1. Client gets an IOException and determines
+   * DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_35() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest34_35(methodName, 0);
+  }
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Tue Dec 22 04:38:16 2009
@@ -83,7 +83,7 @@
   }
 
    private static void runPipelineCloseTest(String methodName,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();