You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/01 01:27:41 UTC
svn commit: r799769 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSClient.java
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Author: szetszwo
Date: Fri Jul 31 23:27:40 2009
New Revision: 799769
URL: http://svn.apache.org/viewvc?rev=799769&view=rev
Log:
HDFS-167. Fix a bug in DFSClient that caused infinite retries on write. Contributed by Bill Zeller
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=799769&r1=799768&r2=799769&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Jul 31 23:27:40 2009
@@ -123,6 +123,9 @@
HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
(Suresh Srinivas via shv)
+ HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+ (Bill Zeller via szetszwo)
+
Release 0.20.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=799769&r1=799768&r2=799769&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jul 31 23:27:40 2009
@@ -127,8 +127,8 @@
public static final Log LOG = LogFactory.getLog(DFSClient.class);
public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
- final private ClientProtocol namenode;
- final private ClientProtocol rpcNamenode;
+ private ClientProtocol namenode;
+ private ClientProtocol rpcNamenode;
final UnixUserGroupInformation ugi;
volatile boolean clientRunning = true;
Random r = new Random();
@@ -219,6 +219,29 @@
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
+ this(conf, stats);
+ this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
+ this.namenode = createNamenode(this.rpcNamenode);
+ }
+
+ /**
+ * Create a new DFSClient connected to the given namenode
+ * and rpcNamenode objects.
+ *
+ * This constructor was written to allow easy testing of the DFSClient class.
+ * End users will most likely want to use one of the other constructors.
+ */
+ public DFSClient(ClientProtocol namenode, ClientProtocol rpcNamenode,
+ Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
+ this(conf, stats);
+ this.namenode = namenode;
+ this.rpcNamenode = rpcNamenode;
+ }
+
+
+ private DFSClient(Configuration conf, FileSystem.Statistics stats)
+ throws IOException {
this.conf = conf;
this.stats = stats;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
@@ -240,9 +263,6 @@
throw (IOException)(new IOException().initCause(e));
}
- this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
- this.namenode = createNamenode(rpcNamenode);
-
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
this.clientName = "DFSClient_" + taskId;
@@ -2856,7 +2876,7 @@
}
private LocatedBlock locateFollowingBlock(long start) throws IOException {
- int retries = 5;
+ int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
@@ -2872,26 +2892,32 @@
if (ue != e) {
throw ue; // no need to retry these exceptions
}
-
- if (--retries == 0 &&
- !NotReplicatedYetException.class.getName().
+
+
+ if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
- throw e;
- } else {
- LOG.info(StringUtils.stringifyException(e));
- if (System.currentTimeMillis() - localstart > 5000) {
- LOG.info("Waiting for replication for "
- + (System.currentTimeMillis() - localstart) / 1000
- + " seconds");
- }
- try {
- LOG.warn("NotReplicatedYetException sleeping " + src
- + " retries left " + retries);
- Thread.sleep(sleeptime);
- sleeptime *= 2;
- } catch (InterruptedException ie) {
+ if (retries == 0) {
+ throw e;
+ } else {
+ --retries;
+ LOG.info(StringUtils.stringifyException(e));
+ if (System.currentTimeMillis() - localstart > 5000) {
+ LOG.info("Waiting for replication for "
+ + (System.currentTimeMillis() - localstart) / 1000
+ + " seconds");
+ }
+ try {
+ LOG.warn("NotReplicatedYetException sleeping " + src
+ + " retries left " + retries);
+ Thread.sleep(sleeptime);
+ sleeptime *= 2;
+ } catch (InterruptedException ie) {
+ }
}
- }
+ } else {
+ throw e;
+ }
+
}
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=799769&r1=799768&r2=799769&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Jul 31 23:27:40 2009
@@ -21,10 +21,18 @@
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
+import org.apache.hadoop.hdfs.server.common.*;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
import junit.framework.TestCase;
@@ -34,6 +42,8 @@
* properly in case of errors.
*/
public class TestDFSClientRetries extends TestCase {
+ public static final Log LOG =
+ LogFactory.getLog(TestDFSClientRetries.class.getName());
// writes 'len' bytes of data to out.
private static void writeData(OutputStream out, int len) throws IOException {
@@ -97,4 +107,132 @@
}
// more tests related to different failure cases can be added here.
+
+ class TestNameNode implements ClientProtocol
+ {
+ int num_calls = 0;
+
+ // The total number of calls that can be made to addBlock
+ // before an exception is thrown
+ int num_calls_allowed;
+ public final String ADD_BLOCK_EXCEPTION = "Testing exception thrown from"
+ + "TestDFSClientRetries::"
+ + "TestNameNode::addBlock";
+ public final String RETRY_CONFIG
+ = "dfs.client.block.write.locateFollowingBlock.retries";
+
+ public TestNameNode(Configuration conf) throws IOException
+ {
+ // +1 because the configuration value is the number of retries and
+ // the first call is not a retry (e.g., 2 retries == 3 total
+ // calls allowed)
+ this.num_calls_allowed = conf.getInt(RETRY_CONFIG, 5) + 1;
+ }
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion)
+ throws IOException
+ {
+ return versionID;
+ }
+
+ public LocatedBlock addBlock(String src, String clientName)
+ throws IOException
+ {
+ num_calls++;
+ if (num_calls > num_calls_allowed) {
+ throw new IOException("addBlock called more times than "
+ + RETRY_CONFIG
+ + " allows.");
+ } else {
+ throw new RemoteException(NotReplicatedYetException.class.getName(),
+ ADD_BLOCK_EXCEPTION);
+ }
+ }
+
+
+ // The following methods are stub methods that are not needed by this mock class
+
+ public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { return null; }
+
+ public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, short replication, long blockSize) throws IOException {}
+
+ public LocatedBlock append(String src, String clientName) throws IOException { return null; }
+
+ public boolean setReplication(String src, short replication) throws IOException { return false; }
+
+ public void setPermission(String src, FsPermission permission) throws IOException {}
+
+ public void setOwner(String src, String username, String groupname) throws IOException {}
+
+ public void abandonBlock(Block b, String src, String holder) throws IOException {}
+
+ public boolean complete(String src, String clientName) throws IOException { return false; }
+
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
+
+ public boolean rename(String src, String dst) throws IOException { return false; }
+
+ public boolean delete(String src) throws IOException { return false; }
+
+ public boolean delete(String src, boolean recursive) throws IOException { return false; }
+
+ public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+
+ public FileStatus[] getListing(String src) throws IOException { return null; }
+
+ public void renewLease(String clientName) throws IOException {}
+
+ public long[] getStats() throws IOException { return null; }
+
+ public DatanodeInfo[] getDatanodeReport(FSConstants.DatanodeReportType type) throws IOException { return null; }
+
+ public long getPreferredBlockSize(String filename) throws IOException { return 0; }
+
+ public boolean setSafeMode(FSConstants.SafeModeAction action) throws IOException { return false; }
+
+ public void saveNamespace() throws IOException {}
+
+ public boolean restoreFailedStorage(String arg) throws AccessControlException { return false; }
+
+ public void refreshNodes() throws IOException {}
+
+ public void finalizeUpgrade() throws IOException {}
+
+ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) throws IOException { return null; }
+
+ public void metaSave(String filename) throws IOException {}
+
+ public FileStatus getFileInfo(String src) throws IOException { return null; }
+
+ public ContentSummary getContentSummary(String path) throws IOException { return null; }
+
+ public void setQuota(String path, long namespaceQuota, long diskspaceQuota) throws IOException {}
+
+ public void fsync(String src, String client) throws IOException {}
+
+ public void setTimes(String src, long mtime, long atime) throws IOException {}
+
+ }
+
+ public void testNotYetReplicatedErrors() throws IOException
+ {
+ Configuration conf = new Configuration();
+
+ // allow 1 retry (2 total calls)
+ conf.setInt("dfs.client.block.write.locateFollowingBlock.retries", 1);
+
+ TestNameNode tnn = new TestNameNode(conf);
+ DFSClient client = new DFSClient(tnn, tnn, conf, null);
+ OutputStream os = client.create("testfile", true);
+ os.write(20); // write one random byte
+
+ try {
+ os.close();
+ } catch (Exception e) {
+ assertTrue("Retries are not being stopped correctly",
+ e.getMessage().equals(tnn.ADD_BLOCK_EXCEPTION));
+ }
+ }
+
}