You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/22 05:38:23 UTC
svn commit: r893066 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/aop/org/apache/hadoop/fi/ src/test/aop/org/apache/hadoop/hdfs/
src/test/aop/org/apache/hadoop/hdfs/server/datanode/
Author: hairong
Date: Tue Dec 22 04:38:16 2009
New Revision: 893066
URL: http://svn.apache.org/viewvc?rev=893066&view=rev
Log:
HDFS-564. Adding pipeline tests 17-35. Contributed by Nicholas, Kan, and Hairong.
Added:
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 22 04:38:16 2009
@@ -576,6 +576,8 @@
HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
(hairong)
+ HDFS-564. Adding pipeline tests 17-35. (hairong)
+
Release 0.20.2 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 22 04:38:16 2009
@@ -432,6 +432,14 @@
return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
}
+ /**
+ * Write the received packet to disk (data only)
+ */
+ private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
+ int numBytesToDisk) throws IOException {
+ out.write(pktBuf, startByteToDisk, numBytesToDisk);
+ }
+
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
@@ -524,7 +532,7 @@
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
- out.write(pktBuf, startByteToDisk, numBytesToDisk);
+ writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Tue Dec 22 04:38:16 2009
@@ -59,30 +59,36 @@
private volatile boolean isSuccess = false;
/** Simulate action for the receiverOpWriteBlock pointcut */
- public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiReceiverOpWriteBlock
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the callReceivePacket pointcut */
- public final ActionContainer<DatanodeID> fiCallReceivePacket
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiCallReceivePacket
+ = new ActionContainer<DatanodeID, IOException>();
+ /** Simulate action for the callWritePacketToDisk pointcut */
+ public final ActionContainer<DatanodeID, IOException> fiCallWritePacketToDisk
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the statusRead pointcut */
- public final ActionContainer<DatanodeID> fiStatusRead
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiStatusRead
+ = new ActionContainer<DatanodeID, IOException>();
+ /** Simulate action for the afterDownstreamStatusRead pointcut */
+ public final ActionContainer<DatanodeID, IOException> fiAfterDownstreamStatusRead
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the pipelineAck pointcut */
- public final ActionContainer<DatanodeID> fiPipelineAck
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiPipelineAck
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the pipelineClose pointcut */
- public final ActionContainer<DatanodeID> fiPipelineClose
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiPipelineClose
+ = new ActionContainer<DatanodeID, IOException>();
/** Simulate action for the blockFileClose pointcut */
- public final ActionContainer<DatanodeID> fiBlockFileClose
- = new ActionContainer<DatanodeID>();
+ public final ActionContainer<DatanodeID, IOException> fiBlockFileClose
+ = new ActionContainer<DatanodeID, IOException>();
/** Verification action for the pipelineInitNonAppend pointcut */
- public final ActionContainer<Integer> fiPipelineInitErrorNonAppend
- = new ActionContainer<Integer>();
+ public final ActionContainer<Integer, RuntimeException> fiPipelineInitErrorNonAppend
+ = new ActionContainer<Integer, RuntimeException>();
/** Verification action for the pipelineErrorAfterInit pointcut */
- public final ActionContainer<Integer> fiPipelineErrorAfterInit
- = new ActionContainer<Integer>();
+ public final ActionContainer<Integer, RuntimeException> fiPipelineErrorAfterInit
+ = new ActionContainer<Integer, RuntimeException>();
/** Get test status */
public boolean isSuccess() {
@@ -121,7 +127,8 @@
}
/** Action for DataNode */
- public static abstract class DataNodeAction implements Action<DatanodeID> {
+ public static abstract class DataNodeAction implements
+ Action<DatanodeID, IOException> {
/** The name of the test */
final String currentTest;
/** The index of the datanode */
@@ -195,6 +202,28 @@
}
}
+ /** Throws OutOfMemoryError if the count is zero. */
+ public static class CountdownOomAction extends OomAction {
+ private final CountdownConstraint countdown;
+
+ /** Create an action for datanode i in the pipeline with count down. */
+ public CountdownOomAction(String currentTest, int i, int count) {
+ super(currentTest, i);
+ countdown = new CountdownConstraint(count);
+ }
+
+ @Override
+ public void run(DatanodeID id) {
+ final DataTransferTest test = getDataTransferTest();
+ final Pipeline p = test.getPipeline(id);
+ if (p.contains(index, id) && countdown.isSatisfied()) {
+ final String s = toString(id);
+ FiTestUtil.LOG.info(s);
+ throw new OutOfMemoryError(s);
+ }
+ }
+ }
+
/** Throws DiskOutOfSpaceException. */
public static class DoosAction extends DataNodeAction {
/** Create an action for datanode i in the pipeline. */
@@ -242,6 +271,28 @@
}
}
+ /** Throws DiskOutOfSpaceException if the count is zero. */
+ public static class CountdownDoosAction extends DoosAction {
+ private final CountdownConstraint countdown;
+
+ /** Create an action for datanode i in the pipeline with count down. */
+ public CountdownDoosAction(String currentTest, int i, int count) {
+ super(currentTest, i);
+ countdown = new CountdownConstraint(count);
+ }
+
+ @Override
+ public void run(DatanodeID id) throws DiskOutOfSpaceException {
+ final DataTransferTest test = getDataTransferTest();
+ final Pipeline p = test.getPipeline(id);
+ if (p.contains(index, id) && countdown.isSatisfied()) {
+ final String s = toString(id);
+ FiTestUtil.LOG.info(s);
+ throw new DiskOutOfSpaceException(s);
+ }
+ }
+ }
+
/**
* Sleep some period of time so that it slows down the datanode
* or sleep forever so that datanode becomes not responding.
@@ -307,8 +358,50 @@
}
}
+ /**
+ * When the count is zero,
+ * sleep some period of time so that it slows down the datanode
+ * or sleep forever so that datanode becomes not responding.
+ */
+ public static class CountdownSleepAction extends SleepAction {
+ private final CountdownConstraint countdown;
+
+ /**
+ * Create an action for datanode i in the pipeline.
+ * @param duration In milliseconds, duration <= 0 means sleeping forever.
+ */
+ public CountdownSleepAction(String currentTest, int i,
+ long duration, int count) {
+ this(currentTest, i, duration, duration+1, count);
+ }
+
+ /** Create an action for datanode i in the pipeline with count down. */
+ public CountdownSleepAction(String currentTest, int i,
+ long minDuration, long maxDuration, int count) {
+ super(currentTest, i, minDuration, maxDuration);
+ countdown = new CountdownConstraint(count);
+ }
+
+ @Override
+ public void run(DatanodeID id) {
+ final DataTransferTest test = getDataTransferTest();
+ final Pipeline p = test.getPipeline(id);
+ if (p.contains(index, id) && countdown.isSatisfied()) {
+ final String s = toString(id) + ", duration = ["
+ + minDuration + "," + maxDuration + ")";
+ FiTestUtil.LOG.info(s);
+ if (maxDuration <= 1) {
+ for(; true; FiTestUtil.sleep(1000)); //sleep forever
+ } else {
+ FiTestUtil.sleep(minDuration, maxDuration);
+ }
+ }
+ }
+ }
+
/** Action for pipeline error verification */
- public static class VerificationAction implements Action<Integer> {
+ public static class VerificationAction implements
+ Action<Integer, RuntimeException> {
/** The name of the test */
final String currentTest;
/** The error index of the datanode */
@@ -343,9 +436,10 @@
* Create a OomAction with a CountdownConstraint
* so that it throws OutOfMemoryError if the count is zero.
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownOomAction(
- String currentTest, int i, int count) {
- return new ConstraintSatisfactionAction<DatanodeID>(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException>
+ createCountdownOomAction(
+ String currentTest, int i, int count) {
+ return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new OomAction(currentTest, i), new CountdownConstraint(count));
}
@@ -353,9 +447,10 @@
* Create a DoosAction with a CountdownConstraint
* so that it throws DiskOutOfSpaceException if the count is zero.
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownDoosAction(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException>
+ createCountdownDoosAction(
String currentTest, int i, int count) {
- return new ConstraintSatisfactionAction<DatanodeID>(
+ return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new DoosAction(currentTest, i), new CountdownConstraint(count));
}
@@ -366,9 +461,9 @@
* sleep some period of time so that it slows down the datanode
* or sleep forever so the that datanode becomes not responding.
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
String currentTest, int i, long minDuration, long maxDuration, int count) {
- return new ConstraintSatisfactionAction<DatanodeID>(
+ return new ConstraintSatisfactionAction<DatanodeID, IOException>(
new SleepAction(currentTest, i, minDuration, maxDuration),
new CountdownConstraint(count));
}
@@ -377,7 +472,7 @@
* Same as
* createCountdownSleepAction(currentTest, i, duration, duration+1, count).
*/
- public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+ public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
String currentTest, int i, long duration, int count) {
return createCountdownSleepAction(currentTest, i, duration, duration+1,
count);
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java Tue Dec 22 04:38:16 2009
@@ -59,9 +59,9 @@
/** Class adds new type of action */
public static class HFlushTest extends DataTransferTest {
- public final ActionContainer<DatanodeID> fiCallHFlush =
- new ActionContainer<DatanodeID>();
- public final ActionContainer<Integer> fiErrorOnCallHFlush =
- new ActionContainer<Integer>();
+ public final ActionContainer<DatanodeID, IOException> fiCallHFlush =
+ new ActionContainer<DatanodeID, IOException>();
+ public final ActionContainer<Integer, RuntimeException> fiErrorOnCallHFlush =
+ new ActionContainer<Integer, RuntimeException>();
}
}
\ No newline at end of file
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java Tue Dec 22 04:38:16 2009
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.fi;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -95,24 +96,23 @@
}
/** Action interface */
- public static interface Action<T> {
+ public static interface Action<T, E extends Exception> {
/** Run the action with the parameter. */
- public void run(T parameter) throws IOException;
+ public void run(T parameter) throws E;
}
/** An ActionContainer contains at most one action. */
- public static class ActionContainer<T> {
- private Action<T> action;
-
+ public static class ActionContainer<T, E extends Exception> {
+ private List<Action<T, E>> actionList = new ArrayList<Action<T, E>>();
/** Create an empty container. */
public ActionContainer() {}
/** Set action. */
- public void set(Action<T> a) {action = a;}
+ public void set(Action<T, E> a) {actionList.add(a);}
/** Run the action if it exists. */
- public void run(T obj) throws IOException {
- if (action != null) {
+ public void run(T obj) throws E {
+ for (Action<T, E> action : actionList) {
action.run(obj);
}
}
@@ -147,13 +147,14 @@
}
/** An action is fired if all the constraints are satisfied. */
- public static class ConstraintSatisfactionAction<T> implements Action<T> {
- private final Action<T> action;
+ public static class ConstraintSatisfactionAction<T, E extends Exception>
+ implements Action<T, E> {
+ private final Action<T, E> action;
private final Constraint[] constraints;
/** Constructor */
public ConstraintSatisfactionAction(
- Action<T> action, Constraint... constraints) {
+ Action<T, E> action, Constraint... constraints) {
this.action = action;
this.constraints = constraints;
}
@@ -163,7 +164,7 @@
* Short-circuit-and is used.
*/
@Override
- public final void run(T parameter) throws IOException {
+ public final void run(T parameter) throws E {
for(Constraint c : constraints) {
if (!c.isSatisfied()) {
return;
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj Tue Dec 22 04:38:16 2009
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
@@ -49,14 +47,10 @@
after(DataStreamer datastreamer) returning : pipelineInitNonAppend(datastreamer) {
LOG.info("FI: after pipelineInitNonAppend: hasError="
+ datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
- try {
- if (datastreamer.hasError) {
- DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
- if (dtTest != null )
- dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (datastreamer.hasError) {
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
}
}
@@ -78,13 +72,9 @@
before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
+ datastreamer.errorIndex);
- try {
- DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
- if (dtTest != null )
- dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null )
+ dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
}
pointcut pipelineClose(DFSOutputStream out):
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java Tue Dec 22 04:38:16 2009
@@ -39,7 +39,7 @@
/**
* Storing acknowleged bytes num. action for fault injection tests
*/
- public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes> {
+ public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
String name;
LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -77,7 +77,7 @@
/**
* Storing acknowleged bytes num. action for fault injection tests
*/
- public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes> {
+ public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
String name;
LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -118,10 +118,10 @@
LinkedList<NodeBytes> received = new LinkedList<NodeBytes>();
LinkedList<NodeBytes> acked = new LinkedList<NodeBytes>();
- public final ActionContainer<NodeBytes> fiCallSetNumBytes =
- new ActionContainer<NodeBytes>();
- public final ActionContainer<NodeBytes> fiCallSetBytesAcked =
- new ActionContainer<NodeBytes>();
+ public final ActionContainer<NodeBytes, IOException> fiCallSetNumBytes =
+ new ActionContainer<NodeBytes, IOException>();
+ public final ActionContainer<NodeBytes, IOException> fiCallSetBytesAcked =
+ new ActionContainer<NodeBytes, IOException>();
private static boolean suspend = false;
private static long lastQueuedPacket = -1;
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 22 04:38:16 2009
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.Pipeline;
import org.apache.hadoop.fi.PipelineTest;
import org.apache.hadoop.fi.ProbabilityModel;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
@@ -44,12 +45,7 @@
public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
pointcut callReceivePacket(BlockReceiver blockreceiver) :
- call (* OutputStream.write(..))
- && withincode (* BlockReceiver.receivePacket(..))
-// to further limit the application of this aspect a very narrow 'target' can be used as follows
-// && target(DataOutputStream)
- && !within(BlockReceiverAspects +)
- && this(blockreceiver);
+ call(* receivePacket(..)) && target(blockreceiver);
before(BlockReceiver blockreceiver
) throws IOException : callReceivePacket(blockreceiver) {
@@ -67,7 +63,30 @@
}
}
- // Pointcuts and advises for TestFiPipelines
+ pointcut callWritePacketToDisk(BlockReceiver blockreceiver) :
+ call(* writePacketToDisk(..)) && target(blockreceiver);
+
+ before(BlockReceiver blockreceiver
+ ) throws IOException : callWritePacketToDisk(blockreceiver) {
+ LOG.info("FI: callWritePacketToDisk");
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiCallWritePacketToDisk.run(
+ blockreceiver.getDataNode().getDatanodeRegistration());
+ }
+
+ pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
+ call(void PipelineAck.readFields(DataInput)) && this(responder);
+
+ after(BlockReceiver.PacketResponder responder)
+ throws IOException: afterDownstreamStatusRead(responder) {
+ final DataNode d = responder.receiver.getDataNode();
+ DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+ if (dtTest != null)
+ dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeRegistration());
+ }
+
+ // Pointcuts and advises for TestFiPipelines
pointcut callSetNumBytes(BlockReceiver br, long offset) :
call (void ReplicaInPipelineInterface.setNumBytes(long))
&& withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Tue Dec 22 04:38:16 2009
@@ -19,6 +19,7 @@
import java.io.IOException;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.FiTestUtil;
@@ -101,7 +102,7 @@
}
private static void runReceiverOpWriteBlockTest(String methodName,
- int errorIndex, Action<DatanodeID> a) throws IOException {
+ int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
@@ -113,7 +114,7 @@
}
private static void runStatusReadTest(String methodName, int errorIndex,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
@@ -124,11 +125,11 @@
Assert.assertTrue(t.isSuccess());
}
- private static void runCallReceivePacketTest(String methodName,
- int errorIndex, Action<DatanodeID> a) throws IOException {
+ private static void runCallWritePacketToDisk(String methodName,
+ int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
- t.fiCallReceivePacket.set(a);
+ t.fiCallWritePacketToDisk.set(a);
t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex));
write1byte(methodName);
Assert.assertTrue(t.isSuccess());
@@ -280,7 +281,7 @@
@Test
public void pipeline_Fi_14() throws IOException {
final String methodName = FiTestUtil.getMethodName();
- runCallReceivePacketTest(methodName, 0, new DoosAction(methodName, 0));
+ runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0));
}
/**
@@ -291,7 +292,7 @@
@Test
public void pipeline_Fi_15() throws IOException {
final String methodName = FiTestUtil.getMethodName();
- runCallReceivePacketTest(methodName, 1, new DoosAction(methodName, 1));
+ runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1));
}
/**
@@ -302,11 +303,11 @@
@Test
public void pipeline_Fi_16() throws IOException {
final String methodName = FiTestUtil.getMethodName();
- runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2));
+ runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
}
private static void runPipelineCloseTest(String methodName,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
@@ -324,7 +325,7 @@
final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
final MarkerConstraint marker = new MarkerConstraint(name);
t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
- t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID>(a, marker));
+ t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
write1byte(name);
}
@@ -442,7 +443,7 @@
}
private static void runBlockFileCloseTest(String methodName,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();
Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java?rev=893066&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Tue Dec 22 04:38:16 2009
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.log4j.Level;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiDataTransferProtocol2 {
+ static final short REPLICATION = 3;
+ static final long BLOCKSIZE = 1L * (1L << 20);
+ static final int PACKET_SIZE = 1024;
+ static final int MIN_N_PACKET = 3;
+ static final int MAX_N_PACKET = 10;
+
+ static final Configuration conf = new Configuration();
+ static {
+ conf.setInt("dfs.datanode.handler.count", 1);
+ conf.setInt("dfs.replication", REPLICATION);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
+ conf.setInt("dfs.socket.timeout", 5000);
+ }
+
+ static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE];
+ static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE];
+
+ static private FSDataOutputStream createFile(FileSystem fs, Path p
+ ) throws IOException {
+ return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+ REPLICATION, BLOCKSIZE);
+ }
+
+ {
+ ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+ /**
+ * 1. create files with dfs
+ * 2. write MIN_N_PACKET to MAX_N_PACKET packets
+ * 3. close file
+ * 4. open the same file
+ * 5. read the bytes and compare results
+ */
+ private static void writeSeveralPackets(String methodName) throws IOException {
+ final Random r = FiTestUtil.RANDOM.get();
+ final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1);
+ final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1);
+ final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize;
+
+ FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
+ + ", lastPacketSize=" + lastPacketSize);
+
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
+ final FileSystem dfs = cluster.getFileSystem();
+ try {
+ final Path p = new Path("/" + methodName + "/foo");
+ final FSDataOutputStream out = createFile(dfs, p);
+
+ final long seed = r.nextLong();
+ final Random ran = new Random(seed);
+ ran.nextBytes(bytes);
+ out.write(bytes, 0, size);
+ out.close();
+
+ final FSDataInputStream in = dfs.open(p);
+ int totalRead = 0;
+ int nRead = 0;
+ while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) {
+ totalRead += nRead;
+ }
+ Assert.assertEquals("Cannot read file.", size, totalRead);
+ for (int i = 0; i < size; i++) {
+ Assert.assertTrue("File content differ.", bytes[i] == toRead[i]);
+ }
+ }
+ finally {
+ dfs.close();
+ cluster.shutdown();
+ }
+ }
+
+ private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a)
+ throws IOException {
+ t.fiCallReceivePacket.set(a);
+ t.fiReceiverOpWriteBlock.set(a);
+ t.fiStatusRead.set(a);
+ }
+
+ private void runTest17_19(String methodName, int dnIndex)
+ throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final int maxSleep = 3000;
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+ t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3));
+ t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+ writeSeveralPackets(methodName);
+ Assert.assertTrue(t.isSuccess());
+ }
+
+ private void runTest29_30(String methodName, int dnIndex) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final int maxSleep = 3000;
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+ initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+ t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3));
+ t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+ writeSeveralPackets(methodName);
+ Assert.assertTrue(t.isSuccess());
+ }
+
+ private void runTest34_35(String methodName, int dnIndex) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3));
+ t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+ writeSeveralPackets(methodName);
+ Assert.assertTrue(t.isSuccess());
+ }
+ /**
+ * Streaming:
+ * Randomize datanode speed, write several packets,
+ * DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+ * Client gets an IOException and determines DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_17() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest17_19(methodName, 0);
+ }
+
+ /**
+ * Streaming:
+ * Randomize datanode speed, write several packets,
+ * DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+ * Client gets an IOException and determines DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_18() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest17_19(methodName, 1);
+ }
+
+ /**
+ * Streaming:
+ * Randomize datanode speed, write several packets,
+ * DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+ * Client gets an IOException and determines DN2 bad.
+ */
+ @Test
+ public void pipeline_Fi_19() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest17_19(methodName, 2);
+ }
+
+ /**
+ * Streaming: Client writes several packets with DN0 very slow. Client
+ * finishes write successfully.
+ */
+ @Test
+ public void pipeline_Fi_20() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 0, 3000));
+ writeSeveralPackets(methodName);
+ }
+
+ /**
+ * Streaming: Client writes several packets with DN1 very slow. Client
+ * finishes write successfully.
+ */
+ @Test
+ public void pipeline_Fi_21() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 1, 3000));
+ writeSeveralPackets(methodName);
+ }
+
+ /**
+ * Streaming: Client writes several packets with DN2 very slow. Client
+ * finishes write successfully.
+ */
+ @Test
+ public void pipeline_Fi_22() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ initSlowDatanodeTest(t, new SleepAction(methodName, 2, 3000));
+ writeSeveralPackets(methodName);
+ }
+
+ /**
+ * Streaming: Randomize datanode speed, write several packets, DN1 throws a
+ * OutOfMemoryException when it receives the ack of the third packet from DN2.
+ * Client gets an IOException and determines DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_29() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest29_30(methodName, 1);
+ }
+
+ /**
+ * Streaming: Randomize datanode speed, write several packets, DN0 throws a
+ * OutOfMemoryException when it receives the ack of the third packet from DN1.
+ * Client gets an IOException and determines DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_30() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest29_30(methodName, 0);
+ }
+
+ /**
+ * Streaming: Write several packets, DN1 never responses when it receives the
+ * ack of the third packet from DN2. Client gets an IOException and determines
+ * DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_34() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest34_35(methodName, 1);
+ }
+
+ /**
+ * Streaming: Write several packets, DN0 never responses when it receives the
+ * ack of the third packet from DN1. Client gets an IOException and determines
+ * DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_35() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runTest34_35(methodName, 0);
+ }
+}
\ No newline at end of file
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java?rev=893066&r1=893065&r2=893066&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Tue Dec 22 04:38:16 2009
@@ -83,7 +83,7 @@
}
private static void runPipelineCloseTest(String methodName,
- Action<DatanodeID> a) throws IOException {
+ Action<DatanodeID, IOException> a) throws IOException {
FiTestUtil.LOG.info("Running " + methodName + " ...");
final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
.initTest();