You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/12/15 07:08:39 UTC
svn commit: r890655 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/aop/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/ha...
Author: hairong
Date: Tue Dec 15 06:08:39 2009
New Revision: 890655
URL: http://svn.apache.org/viewvc?rev=890655&view=rev
Log:
HDFS-724. Pipeline hangs when one of the block receiver is not responsive. Contributed by Hairong Kuang.
Added:
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Dec 15 06:08:39 2009
@@ -603,6 +603,9 @@
HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
name-node is in safemode. (Ravi Phulari via shv)
+ HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+ (hairong)
+
Release 0.20.1 - 2009-09-01
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Dec 15 06:08:39 2009
@@ -2475,7 +2475,27 @@
int dataPos;
int checksumStart;
int checksumPos;
-
+ private static final long HEART_BEAT_SEQNO = -1L;
+
+ /**
+ * create a heartbeat packet
+ */
+ Packet() {
+ this.lastPacketInBlock = false;
+ this.numChunks = 0;
+ this.offsetInBlock = 0;
+ this.seqno = HEART_BEAT_SEQNO;
+
+ buffer = null;
+ int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+ buf = new byte[packetSize];
+
+ checksumStart = dataStart = packetSize;
+ checksumPos = checksumStart;
+ dataPos = dataStart;
+ maxChunks = 0;
+ }
+
// create a new packet
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
@@ -2562,6 +2582,14 @@
return offsetInBlock + dataPos - dataStart;
}
+ /**
+ * Check if this packet is a heart beat packet
+ * @return true if the sequence number is HEART_BEAT_SEQNO
+ */
+ private boolean isHeartbeatPacket() {
+ return seqno == HEART_BEAT_SEQNO;
+ }
+
public String toString() {
return "packet seqno:" + this.seqno +
" offsetInBlock:" + this.offsetInBlock +
@@ -2680,6 +2708,7 @@
* and closes them. Any error recovery is also done by this thread.
*/
public void run() {
+ long lastPacket = System.currentTimeMillis();
while (!streamerClosed && clientRunning) {
// if the Responder encountered an error, shutdown Responder
@@ -2703,19 +2732,32 @@
synchronized (dataQueue) {
// wait for a packet to be sent.
+ long now = System.currentTimeMillis();
while ((!streamerClosed && !hasError && clientRunning
- && dataQueue.size() == 0) || doSleep) {
+ && dataQueue.size() == 0 &&
+ (stage != BlockConstructionStage.DATA_STREAMING ||
+ stage == BlockConstructionStage.DATA_STREAMING &&
+ now - lastPacket < socketTimeout/2)) || doSleep ) {
+ long timeout = socketTimeout/2 - (now-lastPacket);
+ timeout = timeout <= 0 ? 1000 : timeout;
+ timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+ timeout : 1000;
try {
- dataQueue.wait(1000);
+ dataQueue.wait(timeout);
} catch (InterruptedException e) {
}
doSleep = false;
+ now = System.currentTimeMillis();
}
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+ if (streamerClosed || hasError || !clientRunning) {
continue;
}
// get packet to be sent.
- one = dataQueue.getFirst();
+ if (dataQueue.isEmpty()) {
+ one = new Packet(); // heartbeat packet
+ } else {
+ one = dataQueue.getFirst(); // regular data packet
+ }
}
// get new block from namenode.
@@ -2761,9 +2803,11 @@
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
- dataQueue.removeFirst();
- ackQueue.addLast(one);
- dataQueue.notifyAll();
+ if (!one.isHeartbeatPacket()) {
+ dataQueue.removeFirst();
+ ackQueue.addLast(one);
+ dataQueue.notifyAll();
+ }
}
if (LOG.isDebugEnabled()) {
@@ -2774,6 +2818,10 @@
// write out data to remote datanode
blockStream.write(buf.array(), buf.position(), buf.remaining());
blockStream.flush();
+ lastPacket = System.currentTimeMillis();
+
+ if (one.isHeartbeatPacket()) { //heartbeat packet
+ }
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2910,24 +2958,7 @@
}
long seqno = ack.getSeqno();
- Packet one = null;
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- continue;
- } else if (seqno == -2) {
- // no nothing
- } else {
- synchronized (dataQueue) {
- one = ackQueue.getFirst();
- }
- if (one.seqno != seqno) {
- throw new IOException("Responseprocessor: Expecting seqno " +
- " for block " + block +
- one.seqno + " but received " + seqno);
- }
- isLastPacketInBlock = one.lastPacketInBlock;
- }
-
- // processes response status from all datanodes.
+ // processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && clientRunning; i--) {
final DataTransferProtocol.Status reply = ack.getReply(i);
if (reply != SUCCESS) {
@@ -2938,12 +2969,24 @@
targets[i].getName());
}
}
+
+ assert seqno != PipelineAck.UNKOWN_SEQNO :
+ "Ack for unkown seqno should be a failed ack: " + ack;
+ if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
+ continue;
+ }
- if (one == null) {
- throw new IOException("Panic: responder did not receive " +
- "an ack for a packet: " + seqno);
+ // a success ack for a data packet
+ Packet one = null;
+ synchronized (dataQueue) {
+ one = ackQueue.getFirst();
}
-
+ if (one.seqno != seqno) {
+ throw new IOException("Responseprocessor: Expecting seqno " +
+ " for block " + block +
+ one.seqno + " but received " + seqno);
+ }
+ isLastPacketInBlock = one.lastPacketInBlock;
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Dec 15 06:08:39 2009
@@ -458,7 +458,7 @@
public static class PipelineAck implements Writable {
private long seqno;
private Status replies[];
- final public static PipelineAck HEART_BEAT = new PipelineAck(-1, new Status[0]);
+ public final static long UNKOWN_SEQNO = -2;
/** default constructor **/
public PipelineAck() {
@@ -495,6 +495,10 @@
* @return the the ith reply
*/
public Status getReply(int i) {
+ if (i<0 || i>=replies.length) {
+ throw new IllegalArgumentException("The input parameter " + i +
+ " should in the range of [0, " + replies.length);
+ }
return replies[i];
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Dec 15 06:08:39 2009
@@ -560,7 +560,7 @@
throttler.throttle(len);
}
- return len;
+ return lastPacketInBlock?-1:len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -589,9 +589,9 @@
}
/*
- * Receive until packet has zero bytes of data.
+ * Receive until the last packet.
*/
- while (receivePacket() > 0) {}
+ while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
@@ -775,118 +775,11 @@
notifyAll();
}
- private synchronized void lastDataNodeRun() {
- long lastHeartbeat = System.currentTimeMillis();
- boolean lastPacket = false;
- final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
- while (running && datanode.shouldRun && !lastPacket) {
- long now = System.currentTimeMillis();
- try {
-
- // wait for a packet to be sent to downstream datanode
- while (running && datanode.shouldRun && ackQueue.size() == 0) {
- long idle = now - lastHeartbeat;
- long timeout = (datanode.socketTimeout/2) - idle;
- if (timeout <= 0) {
- timeout = 1000;
- }
- try {
- wait(timeout);
- } catch (InterruptedException e) {
- if (running) {
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " Interrupted.");
- running = false;
- }
- break;
- }
-
- // send a heartbeat if it is time.
- now = System.currentTimeMillis();
- if (now - lastHeartbeat > datanode.socketTimeout/2) {
- PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
- replyOut.flush();
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " sent a heartbeat");
- }
- lastHeartbeat = now;
- }
- }
-
- if (!running || !datanode.shouldRun) {
- break;
- }
- Packet pkt = ackQueue.getFirst();
- long expected = pkt.seqno;
- LOG.debug("PacketResponder " + numTargets +
- " for block " + block +
- " acking for packet " + expected);
-
- // If this is the last packet in block, then close block
- // file and finalize the block before responding success
- if (pkt.lastPacketInBlock) {
- receiver.close();
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
- block.setNumBytes(replicaInfo.getNumBytes());
- datanode.data.finalizeBlock(block);
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
- if (ClientTraceLog.isInfoEnabled() &&
- receiver.clientName.length() > 0) {
- long offset = 0;
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
- "HDFS_WRITE", receiver.clientName, offset,
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
- } else {
- LOG.info("Received block " + block +
- " of size " + block.getNumBytes() +
- " from " + receiver.inAddr);
- }
- lastPacket = true;
- }
-
- new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
- replyOut.flush();
- // remove the packet from the ack queue
- removeAckHead();
- // update the bytes acked
- if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
- }
- } catch (Exception e) {
- LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
- if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
- ioe);
- }
- LOG.info("PacketResponder " + block + " " + numTargets +
- " Exception " + StringUtils.stringifyException(e));
- running = false;
- }
- }
- }
- LOG.info("PacketResponder " + numTargets +
- " for block " + block + " terminating");
- }
-
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
-
- // If this is the last datanode in pipeline, then handle differently
- if (numTargets == 0) {
- lastDataNodeRun();
- return;
- }
-
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
@@ -897,19 +790,18 @@
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
- try {
- // read an ack from downstream datanode
- ack.readFields(mirrorIn);
- if (LOG.isDebugEnabled()) {
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ long seqno = PipelineAck.UNKOWN_SEQNO;
+ try {
+ if (numTargets != 0) {// not the last DN
+ // read an ack from downstream datanode
+ ack.readFields(mirrorIn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
+ }
+ seqno = ack.getSeqno();
+ didRead = true;
}
- long seqno = ack.getSeqno();
- didRead = true;
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
- ack.write(replyOut);
- replyOut.flush();
- continue;
- } else if (seqno >= 0) {
+ if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
@@ -925,9 +817,12 @@
throw e;
}
}
+ if (!running || !datanode.shouldRun) {
+ break;
+ }
pkt = ackQueue.getFirst();
expected = pkt.seqno;
- if (seqno != expected) {
+ if (numTargets > 0 && seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
" expected seqno:" + expected +
@@ -983,14 +878,15 @@
// construct my ack message
Status[] replies = null;
- if (!didRead) { // no ack is read
+ if (!didRead && numTargets != 0) { // ack read error
replies = new Status[2];
replies[0] = SUCCESS;
replies[1] = ERROR;
} else {
- replies = new Status[1+ack.getNumOfReplies()];
+ short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+ replies = new Status[1+ackLen];
replies[0] = SUCCESS;
- for (int i=0; i<ack.getNumOfReplies(); i++) {
+ for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
}
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Tue Dec 15 06:08:39 2009
@@ -99,12 +99,6 @@
&& args(acked)
&& this(pr);
- pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) :
- call (void ReplicaInPipelineInterface.setBytesAcked(long))
- && withincode (void PacketResponder.lastDataNodeRun())
- && args(acked)
- && this(pr);
-
after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
if (pTest == null) {
@@ -117,19 +111,7 @@
bytesAckedService((PipelinesTest)pTest, pr, acked);
}
}
- after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
- PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
- if (pTest == null) {
- LOG.debug("FI: no pipeline has been found in acking");
- return;
- }
- LOG.debug("FI: Acked total bytes from (last DN): " +
- pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
- if (pTest instanceof PipelinesTest) {
- bytesAckedService((PipelinesTest)pTest, pr, acked);
- }
- }
-
+
private void bytesAckedService
(final PipelinesTest pTest, final PacketResponder pr, final long acked) {
NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
Added: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java?rev=890655&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java (added)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java Tue Dec 15 06:08:39 2009
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiPipelineClose {
+ static final short REPLICATION = 3;
+ static final long BLOCKSIZE = 1L * (1L << 20);
+
+ static final Configuration conf = new HdfsConfiguration();
+ static {
+ conf.setInt("dfs.datanode.handler.count", 1);
+ conf.setInt("dfs.replication", REPLICATION);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+ }
+
+ static private FSDataOutputStream createFile(FileSystem fs, Path p
+ ) throws IOException {
+ return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+ REPLICATION, BLOCKSIZE);
+ }
+
+ /**
+ * 1. create files with dfs
+ * 2. write 1 byte
+ * 3. close file
+ * 4. open the same file
+ * 5. read the 1 byte and compare results
+ */
+ private static void write1byte(String methodName) throws IOException {
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
+ null);
+ final FileSystem dfs = cluster.getFileSystem();
+ try {
+ final Path p = new Path("/" + methodName + "/foo");
+ final FSDataOutputStream out = createFile(dfs, p);
+ out.write(1);
+ out.close();
+
+ final FSDataInputStream in = dfs.open(p);
+ final int b = in.read();
+ in.close();
+ Assert.assertEquals(1, b);
+ }
+ finally {
+ dfs.close();
+ cluster.shutdown();
+ }
+ }
+
+ private static void runPipelineCloseTest(String methodName,
+ Action<DatanodeID> a) throws IOException {
+ FiTestUtil.LOG.info("Running " + methodName + " ...");
+ final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+ .initTest();
+ t.fiPipelineClose.set(a);
+ write1byte(methodName);
+ }
+
+ /**
+ * Pipeline close:
+ * DN0 never responses after received close request from client.
+ * Client gets an IOException and determine DN0 bad.
+ */
+ @Test
+ public void pipeline_Fi_36() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new SleepAction(methodName, 0, 0));
+ }
+
+ /**
+ * Pipeline close:
+ * DN1 never responses after received close request from client.
+ * Client gets an IOException and determine DN1 bad.
+ */
+ @Test
+ public void pipeline_Fi_37() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new SleepAction(methodName, 1, 0));
+ }
+
+ /**
+ * Pipeline close:
+ * DN2 never responses after received close request from client.
+ * Client gets an IOException and determine DN2 bad.
+ */
+ @Test
+ public void pipeline_Fi_38() throws IOException {
+ final String methodName = FiTestUtil.getMethodName();
+ runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=890655&r1=890654&r2=890655&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Tue Dec 15 06:08:39 2009
@@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hdfs;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
@@ -30,6 +34,11 @@
/** Class contains a set of tests to verify the correctness of
* newly introduced {@link FSDataOutputStream#hflush()} method */
public class TestHFlush {
+ {
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private final String fName = "hflushtest.dat";
/** The test uses {@link #doTheJob(Configuration, String, long, short)
@@ -143,4 +152,55 @@
actual[idx] = 0;
}
}
+
+ /** This creates a slow writer and check to see
+ * if pipeline heartbeats work fine
+ */
+ @Test
+ public void testPipelineHeartbeat() throws Exception {
+ final int DATANODE_NUM = 2;
+ final int fileLen = 6;
+ Configuration conf = new HdfsConfiguration();
+ final int timeout = 2000;
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+ timeout);
+
+ final Path p = new Path("/pipelineHeartbeat/foo");
+ System.out.println("p=" + p);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+ DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+ byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+ // create a new file.
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+ stm.write(fileContents, 0, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+ System.out.println("Wrote 1 byte and hflush " + p);
+
+ // write another byte
+ Thread.sleep(timeout);
+ stm.write(fileContents, 1, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 2, 1);
+ Thread.sleep(timeout);
+ stm.hflush();
+
+ stm.write(fileContents, 3, 1);
+ Thread.sleep(timeout);
+ stm.write(fileContents, 4, 1);
+ stm.hflush();
+
+ stm.write(fileContents, 5, 1);
+ Thread.sleep(timeout);
+ stm.close();
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, p, fileLen,
+ fileContents, "Failed to slowly write to a file");
+ }
}