You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/09/12 19:56:15 UTC
svn commit: r694755 - in /hadoop/core/trunk: CHANGES.txt
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Author: rangadi
Date: Fri Sep 12 10:56:15 2008
New Revision: 694755
URL: http://svn.apache.org/viewvc?rev=694755&view=rev
Log:
HADOOP-3831. Very slow reading clients sometimes failed while reading. (rangadi)
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=694755&r1=694754&r2=694755&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 12 10:56:15 2008
@@ -537,6 +537,9 @@
HADOOP-4112. Handles cleanupTask in JobHistory
(Amareshwari Sriramadasu via ddas)
+ HADOOP-3831. Very slow reading clients sometimes failed while reading.
+ (rangadi)
+
Release 0.18.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=694755&r1=694754&r2=694755&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 12 10:56:15 2008
@@ -80,6 +80,7 @@
private int datanodeWriteTimeout;
final int writePacketSize;
private FileSystem.Statistics stats;
+ private int maxBlockAcquireFailures;
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
@@ -162,6 +163,9 @@
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+ this.maxBlockAcquireFailures =
+ conf.getInt("dfs.client.max.block.acquire.failures",
+ MAX_BLOCK_ACQUIRE_FAILURES);
try {
this.ugi = UnixUserGroupInformation.login(conf, true);
@@ -1473,6 +1477,14 @@
private synchronized int readBuffer(byte buf[], int off, int len)
throws IOException {
IOException ioe;
+
+ /* we retry current node only once. So this is set to true only here.
+ * Intention is to handle one common case of an error that is not a
+ * failure on datanode or client : when DataNode closes the connection
+ * since client is idle. If there are other cases of "non-errors" then
+ * then a datanode might be retried by setting this to true again.
+ */
+ boolean retryCurrentNode = true;
while (true) {
// retry as many times as seekToNewSource allows.
@@ -1483,16 +1495,30 @@
currentNode.getName() + " at " + ce.getPos());
reportChecksumFailure(src, currentBlock, currentNode);
ioe = ce;
+ retryCurrentNode = false;
} catch ( IOException e ) {
- LOG.warn("Exception while reading from " + currentBlock +
- " of " + src + " from " + currentNode + ": " +
- StringUtils.stringifyException(e));
+ if (!retryCurrentNode) {
+ LOG.warn("Exception while reading from " + currentBlock +
+ " of " + src + " from " + currentNode + ": " +
+ StringUtils.stringifyException(e));
+ }
ioe = e;
}
- addToDeadNodes(currentNode);
- if (!seekToNewSource(pos)) {
- throw ioe;
+ boolean sourceFound = false;
+ if (retryCurrentNode) {
+ /* possibly retry the same node so that transient errors don't
+ * result in application level failures (e.g. Datanode could have
+ * closed the connection because the client is idle for too long).
+ */
+ sourceFound = seekToBlockSource(pos);
+ } else {
+ addToDeadNodes(currentNode);
+ sourceFound = seekToNewSource(pos);
+ }
+ if (!sourceFound) {
+ throw ioe;
}
+ retryCurrentNode = false;
}
}
@@ -1554,7 +1580,7 @@
return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) {
String blockInfo = block.getBlock() + " file=" + src;
- if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+ if (failures >= maxBlockAcquireFailures) {
throw new IOException("Could not obtain block: " + blockInfo);
}
@@ -1727,6 +1753,16 @@
}
/**
+ * Same as {@link #seekToNewSource(long)} except that it does not exclude
+ * the current datanode and might connect to the same node.
+ */
+ private synchronized boolean seekToBlockSource(long targetPos)
+ throws IOException {
+ currentNode = blockSeekTo(targetPos);
+ return true;
+ }
+
+ /**
* Seek to given position on a node other than the current node. If
* a node other than the current node is found, then returns true.
* If another node could not be found, then returns false.
Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=694755&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Sep 12 10:56:15 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import junit.framework.TestCase;
+
+
+/**
+ * These tests make sure that DFSClient retries fetching data from DFS
+ * properly in case of errors.
+ */
+public class TestDFSClientRetries extends TestCase {
+
+ // writes 'len' bytes of data to out.
+ private static void writeData(OutputStream out, int len) throws IOException {
+ byte [] buf = new byte[4096*16];
+ while(len > 0) {
+ int toWrite = Math.min(len, buf.length);
+ out.write(buf, 0, toWrite);
+ len -= toWrite;
+ }
+ }
+
+ /**
+ * This makes sure that when DN closes clients socket after client had
+ * successfully connected earlier, the data can still be fetched.
+ */
+ public void testWriteTimeoutAtDataNode() throws IOException,
+ InterruptedException {
+ Configuration conf = new Configuration();
+
+ final int writeTimeout = 100; //milliseconds.
+ // set a very short write timeout for datanode, so that tests runs fast.
+ conf.setInt("dfs.datanode.socket.write.timeout", writeTimeout);
+ // set a smaller block size
+ final int blockSize = 10*1024*1024;
+ conf.setInt("dfs.block.size", blockSize);
+ conf.setInt("dfs.client.max.block.acquire.failures", 1);
+ // set a small buffer size
+ final int bufferSize = 4096;
+ conf.setInt("io.file.buffer.size", bufferSize);
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+
+ try {
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+
+ Path filePath = new Path("/testWriteTimeoutAtDataNode");
+ OutputStream out = fs.create(filePath, true, bufferSize);
+
+ // write a 2 block file.
+ writeData(out, 2*blockSize);
+ out.close();
+
+ byte[] buf = new byte[1024*1024]; // enough to empty TCP buffers.
+
+ InputStream in = fs.open(filePath, bufferSize);
+
+ //first read a few bytes
+ IOUtils.readFully(in, buf, 0, bufferSize/2);
+ //now read few more chunks of data by sleeping in between :
+ for(int i=0; i<10; i++) {
+ Thread.sleep(2*writeTimeout); // force write timeout at the datanode.
+ // read enough to empty out socket buffers.
+ IOUtils.readFully(in, buf, 0, buf.length);
+ }
+ // successfully read with write timeout on datanodes.
+ in.close();
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ // more tests related to different failure cases can be added here.
+}